Class: Karafka::Connection::ListenersBatch
- Inherits:
-
Object
- Object
- Karafka::Connection::ListenersBatch
- Includes:
- Enumerable
- Defined in:
- lib/karafka/connection/listeners_batch.rb
Overview
Abstraction layer around listeners batch.
Instance Method Summary collapse
-
#active ⇒ Array<Listener>
Active listeners.
-
#each(&block) ⇒ Object
Iterates over available listeners and yields each listener.
- #initialize(jobs_queue) ⇒ ListenersBatch constructor
Constructor Details
#initialize(jobs_queue) ⇒ ListenersBatch
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/karafka/connection/listeners_batch.rb', line 11 def initialize(jobs_queue) # We need one scheduler for all the listeners because in case of complex schedulers, they # should be able to distribute work whenever any work is done in any of the listeners scheduler = App.config.internal.processing.scheduler_class.new(jobs_queue) @batch = App.subscription_groups.flat_map do |_consumer_group, subscription_groups| subscription_groups.map do |subscription_group| Connection::Listener.new( subscription_group, jobs_queue, scheduler ) end end end |
Instance Method Details
#active ⇒ Array<Listener>
Returns active listeners.
34 35 36 |
# File 'lib/karafka/connection/listeners_batch.rb', line 34 def active select(&:active?) end |
#each(&block) ⇒ Object
Iterates over available listeners and yields each listener
29 30 31 |
# File 'lib/karafka/connection/listeners_batch.rb', line 29 def each(&block) @batch.each(&block) end |