Class: Karafka::Connection::Listener
- Inherits:
-
Object
- Object
- Karafka::Connection::Listener
- Includes:
- Helpers::Async
- Defined in:
- lib/karafka/connection/listener.rb
Overview
A single listener that listens to incoming messages from a single subscription group. It polls the messages and then enqueues jobs. It also takes care of potential recovery from critical errors by restarting everything in a safe manner.
This is the heart of the consumption process.
It provides async API for managing, so all status changes are expected to be async.
Instance Attribute Summary collapse
-
#coordinators ⇒ Processing::CoordinatorsBuffer
readonly
Coordinator buffers that can be used directly in advanced cases of changes to the polling flow (like triggered seek back without messages ahead in the topic).
-
#id ⇒ String
readonly
Can be useful for logging.
-
#subscription_group ⇒ Karafka::Routing::SubscriptionGroup
readonly
Subscription group that this listener handles.
Instance Method Summary collapse
-
#active? ⇒ Boolean
Is this listener active (not stopped and not pending).
-
#call ⇒ Object
Runs the main listener fetch loop.
-
#initialize(subscription_group, jobs_queue, scheduler) ⇒ Karafka::Connection::Listener
constructor
Listener instance.
-
#shutdown ⇒ Object
Stops the jobs queue, triggers shutdown on all the executors (sync), commits offsets and stops kafka client.
-
#start! ⇒ Object
We overwrite the state
#start
because on start we need to also start running listener in the async thread.
Methods included from Helpers::Async
#alive?, #async_call, included
Constructor Details
#initialize(subscription_group, jobs_queue, scheduler) ⇒ Karafka::Connection::Listener
Returns listener instance.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/karafka/connection/listener.rb', line 44 def initialize(subscription_group, jobs_queue, scheduler) @id = SecureRandom.hex(6) @subscription_group = subscription_group @jobs_queue = jobs_queue @coordinators = Processing::CoordinatorsBuffer.new(subscription_group.topics) @client = Client.new(@subscription_group, -> { running? }) @executors = Processing::ExecutorsBuffer.new(@client, subscription_group) @partitioner = partitioner_class.new(subscription_group) @scheduler = scheduler @events_poller = Helpers::IntervalRunner.new { @client.events_poll } # We keep one buffer for messages to preserve memory and not allocate extra objects # We can do this that way because we always first schedule jobs using messages before we # fetch another batch. @messages_buffer = MessagesBuffer.new(subscription_group) @mutex = Mutex.new @status = Status.new @jobs_queue.register(@subscription_group.id) # This makes sure that even if we tick more often than the interval time due to frequent # unlocks from short-lived jobs or async queues synchronization, events handling and jobs # scheduling still happens with the expected frequency @interval_runner = Helpers::IntervalRunner.new do @events_poller.call @scheduler.on_manage end end |
Instance Attribute Details
#coordinators ⇒ Processing::CoordinatorsBuffer (readonly)
Returns coordinator buffers that can be used directly in advanced cases of changes to the polling flow (like triggered seek back without messages ahead in the topic).
32 33 34 |
# File 'lib/karafka/connection/listener.rb', line 32 def coordinators @coordinators end |
#id ⇒ String (readonly)
Can be useful for logging
24 25 26 |
# File 'lib/karafka/connection/listener.rb', line 24 def id @id end |
#subscription_group ⇒ Karafka::Routing::SubscriptionGroup (readonly)
Returns subscription group that this listener handles.
27 28 29 |
# File 'lib/karafka/connection/listener.rb', line 27 def subscription_group @subscription_group end |
Instance Method Details
#active? ⇒ Boolean
Returns is this listener active (not stopped and not pending).
108 109 110 |
# File 'lib/karafka/connection/listener.rb', line 108 def active? @status.active? end |
#call ⇒ Object
Prefetch callbacks can be used to seek offset or do other things before we actually start consuming data
Runs the main listener fetch loop.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/karafka/connection/listener.rb', line 76 def call Karafka.monitor.instrument( 'connection.listener.before_fetch_loop', caller: self, client: @client, subscription_group: @subscription_group ) fetch_loop Karafka.monitor.instrument( 'connection.listener.after_fetch_loop', caller: self, client: @client, subscription_group: @subscription_group ) end |
#shutdown ⇒ Object
This method is not private despite being part of the fetch loop because in case of a forceful shutdown, it may be invoked from a separate thread
We wrap it with a mutex exactly because of the above case of forceful shutdown
Stops the jobs queue, triggers shutdown on all the executors (sync), commits offsets and stops kafka client.
136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/karafka/connection/listener.rb', line 136 def shutdown @mutex.synchronize do return if stopped? # Nothing to clear if it was not even running return stopped! if pending? @executors.clear @coordinators.reset @client.stop stopped! end end |
#start! ⇒ Object
We overwrite the state #start
because on start we need to also start running listener in the async thread. While other state transitions happen automatically and status state change is enough, here we need to run the background threads
115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/karafka/connection/listener.rb', line 115 def start! if stopped? @client.reset @status.reset! end @status.start! async_call( "karafka.listener##{@subscription_group.id}", listener_thread_priority ) end |