Class: Karafka::Connection::ListenersBatch

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/karafka/connection/listeners_batch.rb

Overview

Abstraction layer around listeners batch.

Instance Method Summary collapse

Constructor Details

#initialize(jobs_queue) ⇒ ListenersBatch

Parameters:

  • jobs_queue (JobsQueue)


11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/karafka/connection/listeners_batch.rb', line 11

def initialize(jobs_queue)
  # We need one scheduler for all the listeners because in case of complex schedulers, they
  # should be able to distribute work whenever any work is done in any of the listeners
  scheduler = App.config.internal.processing.scheduler_class.new(jobs_queue)

  @batch = App.subscription_groups.flat_map do |_consumer_group, subscription_groups|
    subscription_groups.map do |subscription_group|
      Connection::Listener.new(
        subscription_group,
        jobs_queue,
        scheduler
      )
    end
  end
end

Instance Method Details

#activeArray<Listener>

Returns active listeners.

Returns:

  • (Array<Listener>)

    active listeners



34
35
36
# File 'lib/karafka/connection/listeners_batch.rb', line 34

def active
  select(&:active?)
end

#each(&block) ⇒ Object

Iterates over available listeners and yields each listener

Parameters:

  • block (Proc)

    block we want to run



29
30
31
# File 'lib/karafka/connection/listeners_batch.rb', line 29

def each(&block)
  @batch.each(&block)
end