Class: Karafka::Runner
- Inherits:
-
Object
- Object
- Karafka::Runner
- Defined in:
- lib/karafka/runner.rb
Overview
Class used to run the Karafka listeners in separate threads
Instance Method Summary collapse
-
#call ⇒ Object
Starts listening on all the listeners asynchronously and handles the jobs queue closing after listeners are done with their work.
Instance Method Details
#call ⇒ Object
Starts listening on all the listeners asynchronously and handles the jobs queue closing after listeners are done with their work.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/karafka/runner.rb', line 14 def call # Despite possibility of having several independent listeners, we aim to have one queue for # jobs across and one workers poll for that jobs_queue = jobs_queue_class.new workers = Processing::WorkersBatch.new(jobs_queue) listeners = Connection::ListenersBatch.new(jobs_queue) # We mark it prior to delegating to the manager as manager will have to start at least one # connection to Kafka, hence running Karafka::App.run! # Register all the listeners so they can be started and managed manager.register(listeners) workers.each_with_index { |worker, i| worker.async_call("karafka.worker##{i}") } # We aggregate threads here for a supervised shutdown process Karafka::Server.workers = workers Karafka::Server.listeners = listeners Karafka::Server.jobs_queue = jobs_queue until manager.done? conductor.wait manager.control end # We close the jobs queue only when no listener threads are working. # This ensures, that everything was closed prior to us not accepting anymore jobs and that # no more jobs will be enqueued. Since each listener waits for jobs to finish, once those # are done, we can close. jobs_queue.close # All the workers need to stop processing anything before we can stop the runner completely # This ensures that even async long-running jobs have time to finish before we are done # with everything. One thing worth keeping in mind though: It is the end user responsibility # to handle the shutdown detection in their long-running processes. Otherwise if timeout # is exceeded, there will be a forced shutdown. workers.each(&:join) # If anything crashes here, we need to raise the error and crush the runner because it means # that something terrible happened rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', caller: self, error: e, type: 'runner.call.error' ) Karafka::App.stop! raise e end |