Class: Karafka::Server
- Inherits:
-
Object
- Object
- Karafka::Server
- Defined in:
- lib/karafka/server.rb
Overview
Karafka consuming server class
Class Attribute Summary collapse
-
.execution_mode ⇒ Object
Mode in which the Karafka server is executed.
-
.jobs_queue ⇒ Object
Jobs queue.
-
.listeners ⇒ Object
Set of consuming threads.
-
.workers ⇒ Object
Set of workers.
Class Method Summary collapse
-
.quiet ⇒ Object
Quiets the Karafka server.
-
.run ⇒ Object
Method which runs app.
-
.start ⇒ Object
Starts Karafka with a supervision (and it won’t happen until we explicitly want to stop).
-
.stop ⇒ Object
Stops Karafka with a supervision (as long as there is a shutdown timeout) If consumers or workers won’t stop in a given time frame, it will force them to exit.
Class Attribute Details
.execution_mode ⇒ Object
Mode in which the Karafka server is executed. It can be:
-
:standalone - regular karafka consumer process
-
:embedded - embedded in a different process and not supervised
-
:supervisor - swarm supervisor process
-
:swarm - one of swarm processes
Sometimes it is important to know in what mode we operate, especially from UI perspective as not everything is possible when operating in non-standalone mode, etc.
31 32 33 |
# File 'lib/karafka/server.rb', line 31 def execution_mode @execution_mode end |
.jobs_queue ⇒ Object
Jobs queue
20 21 22 |
# File 'lib/karafka/server.rb', line 20 def jobs_queue @jobs_queue end |
.listeners ⇒ Object
Set of consuming threads. Each consumer thread contains a single consumer
14 15 16 |
# File 'lib/karafka/server.rb', line 14 def listeners @listeners end |
.workers ⇒ Object
Set of workers
17 18 19 |
# File 'lib/karafka/server.rb', line 17 def workers @workers end |
Class Method Details
.quiet ⇒ Object
Quiets the Karafka server.
Karafka will stop processing but won’t quit the consumer group, so no rebalance will be triggered until final shutdown.
167 168 169 170 171 |
# File 'lib/karafka/server.rb', line 167 def quiet # We don't have to safe-guard it with check states as the state transitions work only # in one direction Karafka::App.quiet! end |
.run ⇒ Object
Method which runs app
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/karafka/server.rb', line 34 def run self.listeners = [] self.workers = [] # We need to validate this prior to running because it may be executed also from the # embedded # We cannot validate this during the start because config needs to be populated and routes # need to be defined. config.internal.cli.contract.validate!( config.internal.routing.activity_manager.to_h ) # We clear as we do not want parent handlers in case of working from fork process.clear process.on_sigint { stop } process.on_sigquit { stop } process.on_sigterm { stop } process.on_sigtstp { quiet } # Needed for instrumentation process.on_sigttin {} process.supervise # This will only run when not in a swarm mode. In swarm mode the server runs post-fork, so # warmup will do nothing Karafka::App.warmup # Start is blocking until stop is called and when we stop, it will wait until # all of the things are ready to stop start # We always need to wait for Karafka to stop here since we should wait for the stop running # in a separate thread (or trap context) to indicate everything is closed # Since `#start` is blocking, we will get here only after the runner is done. This will # not add any performance degradation because of that. sleep(0.1) until Karafka::App.terminated? # Try its best to shutdown underlying components before re-raising # rubocop:disable Lint/RescueException rescue Exception => e # rubocop:enable Lint/RescueException stop raise e end |
.start ⇒ Object
We don’t need to sleep because Karafka::Runner is locking and waiting to finish loop
Starts Karafka with a supervision (and it won’t happen until we explicitly want to stop)
81 82 83 |
# File 'lib/karafka/server.rb', line 81 def start Karafka::Runner.new.call end |
.stop ⇒ Object
This method is not async. It should not be executed from the workers as it will lock them forever. If you need to run Karafka shutdown from within workers threads, please start a separate thread to do so.
Stops Karafka with a supervision (as long as there is a shutdown timeout) If consumers or workers won’t stop in a given time frame, it will force them to exit
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/karafka/server.rb', line 91 def stop # Initialize the stopping process only if Karafka was running return if Karafka::App.stopping? return if Karafka::App.stopped? return if Karafka::App.terminated? Karafka::App.stop! timeout = config.shutdown_timeout # We check from time to time (for the timeout period) if all the threads finished # their work and if so, we can just return and normal shutdown process will take place # We divide it by 1000 because we use time in ms. ((timeout / 1_000) * (1 / config.internal.supervision_sleep)).to_i.times do all_listeners_stopped = listeners.all?(&:stopped?) all_workers_stopped = workers.none?(&:alive?) return if all_listeners_stopped && all_workers_stopped sleep(config.internal.supervision_sleep) end raise Errors::ForcefulShutdownError rescue Errors::ForcefulShutdownError => e Karafka.monitor.instrument( 'error.occurred', caller: self, error: e, type: 'app.stopping.error' ) # We're done waiting, lets kill them! workers.each(&:terminate) listeners.active.each(&:terminate) # We always need to shutdown clients to make sure we do not force the GC to close consumer. # This can cause memory leaks and crashes. # We run it in a separate thread in case this would hang and we ignore it after the time # we assigned to it and force shutdown as we prefer to stop the process rather than wait # indefinitely even with risk of VM crash as this is a last resort. Thread.new do listeners.each(&:shutdown) rescue StandardError => e # If anything wrong happened during shutdown, we also want to record it Karafka.monitor.instrument( 'error.occurred', caller: self, error: e, type: 'app.forceful_stopping.error' ) end.join(FORCEFUL_SHUTDOWN_WAIT) # We also do not forcefully terminate everything when running in the embedded mode, # otherwise we would overwrite the shutdown process of the process that started Karafka return unless process.supervised? # exit! is not within the instrumentation as it would not trigger due to exit Kernel.exit!(config.internal.forceful_exit_code) ensure # We need to check if it wasn't an early exit to make sure that only on stop invocation # can change the status after everything is closed if timeout Karafka::App.stopped! # We close producer as the last thing as it can be used in the notification pipeline # to dispatch state changes, etc Karafka::App.producer.close Karafka::App.terminate! end end |