Class: Karafka::Connection::ListenersBatch

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/karafka/connection/listeners_batch.rb

Overview

Abstraction layer around listeners batch.

Instance Method Summary collapse

Constructor Details

#initialize(jobs_queue) ⇒ ListenersBatch

Parameters:

  • jobs_queue (JobsQueue)


11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/karafka/connection/listeners_batch.rb', line 11

def initialize(jobs_queue)
  # We need one scheduler for all the listeners because in case of complex schedulers, they
  # should be able to distribute work whenever any work is done in any of the listeners
  scheduler = App.config.internal.processing.scheduler_class.new(jobs_queue)

  @batch = App.subscription_groups.flat_map do |_consumer_group, subscription_groups|
    subscription_groups.map do |subscription_group|
      Connection::Listener.new(
        subscription_group,
        jobs_queue,
        scheduler
      )
    end
  end
end

Instance Method Details

#activeArray<Listener>

Returns active listeners.

Returns:

  • (Array<Listener>)

    active listeners



33
34
35
# File 'lib/karafka/connection/listeners_batch.rb', line 33

def active
  select(&:active?)
end

#eachObject

Iterates over available listeners and yields each listener



28
29
30
# File 'lib/karafka/connection/listeners_batch.rb', line 28

def each(&)
  @batch.each(&)
end