Class: Karafka::Server
- Inherits:
-
Object
- Object
- Karafka::Server
- Defined in:
- lib/karafka/server.rb
Overview
Karafka consuming server class
Class Attribute Summary collapse
-
.execution_mode ⇒ Object
Mode in which the Karafka server is executed.
-
.id ⇒ Object
id of the server.
-
.jobs_queue ⇒ Object
Jobs queue.
-
.listeners ⇒ Object
Set of consuming threads.
-
.workers ⇒ Object
Set of workers.
Class Method Summary collapse
-
.quiet ⇒ Object
Quiets the Karafka server.
-
.run ⇒ Object
Method which runs app.
-
.start ⇒ Object
Starts Karafka with a supervision (and it won’t happen until we explicitly want to stop).
-
.stop ⇒ Object
Stops Karafka with a supervision (as long as there is a shutdown timeout) If consumers or workers won’t stop in a given time frame, it will force them to exit.
Class Attribute Details
.execution_mode ⇒ Object
Mode in which the Karafka server is executed. It can be:
- :standalone - regular karafka consumer process
- :embedded - embedded in a different process and not supervised
- :supervisor - swarm supervisor process
- :swarm - one of swarm processes
Sometimes it is important to know in what mode we operate, especially from UI perspective as not everything is possible when operating in non-standalone mode, etc.
35 36 37 |
# File 'lib/karafka/server.rb', line 35 def execution_mode @execution_mode end |
.id ⇒ Object
id of the server. Useful for logging when we want to reference things issued by the server.
38 39 40 |
# File 'lib/karafka/server.rb', line 38 def id @id end |
.jobs_queue ⇒ Object
Jobs queue
24 25 26 |
# File 'lib/karafka/server.rb', line 24 def jobs_queue @jobs_queue end |
.listeners ⇒ Object
Set of consuming threads. Each consumer thread contains a single consumer
18 19 20 |
# File 'lib/karafka/server.rb', line 18 def listeners @listeners end |
.workers ⇒ Object
Set of workers
21 22 23 |
# File 'lib/karafka/server.rb', line 21 def workers @workers end |
Class Method Details
.quiet ⇒ Object
Quiets the Karafka server.
Karafka will stop processing but won’t quit the consumer group, so no rebalance will be triggered until final shutdown.
187 188 189 190 191 |
# File 'lib/karafka/server.rb', line 187 def quiet # We don't have to safe-guard it with check states as the state transitions work only # in one direction Karafka::App.quiet! end |
.run ⇒ Object
Method which runs app
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/karafka/server.rb', line 41 def run self.listeners = [] self.workers = [] # We need to validate this prior to running because it may be executed also from the # embedded # We cannot validate this during the start because config needs to be populated and routes # need to be defined. # # We do not validate in swarm because in swarm it is the supervisor that should validate # this. After the supervisor validation some of the internal states like SGs availability # may fail, so double validation may crash unless execution_mode.swarm? cli_contract.validate!( activity_manager.to_h, scope: %w[cli] ) end # We clear as we do not want parent handlers in case of working from fork process.clear process.on_sigint { stop } process.on_sigquit { stop } process.on_sigterm { stop } process.on_sigtstp { quiet } # Needed for instrumentation process.on_sigttin { nil } process.supervise # This will only run when not in a swarm mode. In swarm mode the server runs post-fork, so # warmup will do nothing Karafka::App.warmup # Start is blocking until stop is called and when we stop, it will wait until # all of the things are ready to stop start # We always need to wait for Karafka to stop here since we should wait for the stop running # in a separate thread (or trap context) to indicate everything is closed # Since `#start` is blocking, we will get here only after the runner is done. This will # not add any performance degradation because of that. sleep(0.1) until Karafka::App.terminated? # Try its best to shutdown underlying components before re-raising # rubocop:disable Lint/RescueException rescue Exception => e # rubocop:enable Lint/RescueException stop raise e end |
.start ⇒ Object
We don’t need to sleep because Karafka::Runner is locking and waiting to finish loop
Starts Karafka with a supervision (and it won’t happen until we explicitly want to stop)
95 96 97 |
# File 'lib/karafka/server.rb', line 95 def start Karafka::Runner.new.call end |
.stop ⇒ Object
This method is not async. It should not be executed from the workers as it will lock them forever. If you need to run Karafka shutdown from within workers threads, please start a separate thread to do so.
Stops Karafka with a supervision (as long as there is a shutdown timeout) If consumers or workers won’t stop in a given time frame, it will force them to exit
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/karafka/server.rb', line 105 def stop # Initialize the stopping process only if Karafka was running return if Karafka::App.stopping? return if Karafka::App.stopped? return if Karafka::App.terminated? Karafka::App.stop! timeout = shutdown_timeout # We check from time to time (for the timeout period) if all the threads finished # their work and if so, we can just return and normal shutdown process will take place # We divide it by 1000 because we use time in ms. ((timeout / 1_000) * (1 / supervision_sleep)).to_i.times do all_listeners_stopped = listeners.all?(&:stopped?) all_workers_stopped = workers.none?(&:alive?) return if all_listeners_stopped && all_workers_stopped sleep(supervision_sleep) end raise Errors::ForcefulShutdownError rescue Errors::ForcefulShutdownError => e Karafka.monitor.instrument( 'error.occurred', caller: self, error: e, type: 'app.stopping.error' ) # We're done waiting, lets kill them! workers.each(&:terminate) listeners.active.each(&:terminate) # We always need to shutdown clients to make sure we do not force the GC to close consumer. # This can cause memory leaks and crashes. # We run it in a separate thread in case this would hang and we ignore it after the time # we assigned to it and force shutdown as we prefer to stop the process rather than wait # indefinitely even with risk of VM crash as this is a last resort. Thread.new do listeners.each(&:shutdown) rescue StandardError => e # If anything wrong happened during shutdown, we also want to record it Karafka.monitor.instrument( 'error.occurred', caller: self, error: e, type: 'app.forceful_stopping.error' ) end.join(forceful_shutdown_wait / 1_000.0) # We also do not forcefully terminate everything when running in the embedded mode, # otherwise we would overwrite the shutdown process of the process that started Karafka return unless process.supervised? # exit! is not within the instrumentation as it would not trigger due to exit Kernel.exit!(forceful_exit_code) ensure # We need to check if it wasn't an early exit to make sure that only on stop invocation # can change the status after everything is closed if timeout Karafka::App.stopped! # We close producer as the last thing as it can be used in the notification pipeline # to dispatch state changes, etc Karafka::App.producer.close # Closes the default connection pool (if used). If not used, will do nothing # This ensures that if users have configured the default pool, it is closed correctly # # Custom pools need to be closed by users themselves WaterDrop::ConnectionPool.close Karafka::App.terminate! end end |