Class: Karafka::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/runner.rb

Overview

Class used to run the Karafka listeners in separate threads

Instance Method Summary collapse

Instance Method Details

#callObject

Starts listening on all the listeners asynchronously and handles the jobs queue closing after listeners are done with their work.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/karafka/runner.rb', line 14

def call
  # Despite possibility of having several independent listeners, we aim to have one queue for
  # jobs across and one workers poll for that
  jobs_queue = jobs_queue_class.new

  workers = Processing::WorkersBatch.new(jobs_queue)
  listeners = Connection::ListenersBatch.new(jobs_queue)

  # We mark it prior to delegating to the manager as manager will have to start at least one
  # connection to Kafka, hence running
  Karafka::App.run!

  # Register all the listeners so they can be started and managed
  manager.register(listeners)

  workers.each_with_index { |worker, i| worker.async_call("karafka.worker##{i}") }

  # We aggregate threads here for a supervised shutdown process
  Karafka::Server.workers = workers
  Karafka::Server.listeners = listeners
  Karafka::Server.jobs_queue = jobs_queue

  until manager.done?
    conductor.wait

    manager.control
  end

  # We close the jobs queue only when no listener threads are working.
  # This ensures, that everything was closed prior to us not accepting anymore jobs and that
  # no more jobs will be enqueued. Since each listener waits for jobs to finish, once those
  # are done, we can close.
  jobs_queue.close

  # All the workers need to stop processing anything before we can stop the runner completely
  # This ensures that even async long-running jobs have time to finish before we are done
  # with everything. One thing worth keeping in mind though: It is the end user responsibility
  # to handle the shutdown detection in their long-running processes. Otherwise if timeout
  # is exceeded, there will be a forced shutdown.
  workers.each(&:join)
# If anything crashes here, we need to raise the error and crush the runner because it means
# that something terrible happened
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    caller: self,
    error: e,
    type: 'runner.call.error'
  )
  Karafka::App.stop!
  raise e
end