Class: Karafka::Connection::Listener

Inherits:
Object
  • Object
show all
Includes:
Helpers::Async
Defined in:
lib/karafka/connection/listener.rb

Overview

A single listener that listens to incoming messages from a single subscription group. It polls the messages and then enqueues jobs. It also takes care of potential recovery from critical errors by restarting everything in a safe manner.

This is the heart of the consumption process.

It provides async API for managing, so all status changes are expected to be async.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::Async

#alive?, #async_call, included

Constructor Details

#initialize(subscription_group, jobs_queue, scheduler) ⇒ Karafka::Connection::Listener

Returns listener instance.

Parameters:



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/karafka/connection/listener.rb', line 32

def initialize(subscription_group, jobs_queue, scheduler)
  proc_config = ::Karafka::App.config.internal.processing

  @id = SecureRandom.hex(6)
  @subscription_group = subscription_group
  @jobs_queue = jobs_queue
  @coordinators = Processing::CoordinatorsBuffer.new(subscription_group.topics)
  @client = Client.new(@subscription_group, -> { running? })
  @executors = Processing::ExecutorsBuffer.new(@client, subscription_group)
  @jobs_builder = proc_config.jobs_builder
  @partitioner = proc_config.partitioner_class.new(subscription_group)
  @scheduler = scheduler
  @events_poller = Helpers::IntervalRunner.new { @client.events_poll }
  # We keep one buffer for messages to preserve memory and not allocate extra objects
  # We can do this that way because we always first schedule jobs using messages before we
  # fetch another batch.
  @messages_buffer = MessagesBuffer.new(subscription_group)
  @mutex = Mutex.new
  @status = Status.new

  @jobs_queue.register(@subscription_group.id)

  # This makes sure that even if we tick more often than the interval time due to frequent
  # unlocks from short-lived jobs or async queues synchronization, events handling and jobs
  # scheduling still happens with the expected frequency
  @interval_runner = Helpers::IntervalRunner.new do
    @events_poller.call
    @scheduler.on_manage
  end
end

Instance Attribute Details

#idString (readonly)

Can be useful for logging

Returns:

  • (String)

    id of this listener



17
18
19
# File 'lib/karafka/connection/listener.rb', line 17

def id
  @id
end

#subscription_groupKarafka::Routing::SubscriptionGroup (readonly)

Returns subscription group that this listener handles.

Returns:



20
21
22
# File 'lib/karafka/connection/listener.rb', line 20

def subscription_group
  @subscription_group
end

Instance Method Details

#active?Boolean

Returns is this listener active (not stopped and not pending).

Returns:

  • (Boolean)

    is this listener active (not stopped and not pending)



99
100
101
# File 'lib/karafka/connection/listener.rb', line 99

def active?
  @status.active?
end

#callObject

Note:

Prefetch callbacks can be used to seek offset or do other things before we actually start consuming data

Runs the main listener fetch loop.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/karafka/connection/listener.rb', line 67

def call
  Karafka.monitor.instrument(
    'connection.listener.before_fetch_loop',
    caller: self,
    client: @client,
    subscription_group: @subscription_group
  )

  fetch_loop

  Karafka.monitor.instrument(
    'connection.listener.after_fetch_loop',
    caller: self,
    client: @client,
    subscription_group: @subscription_group
  )
end

#shutdownObject

Note:

This method is not private despite being part of the fetch loop because in case of a forceful shutdown, it may be invoked from a separate thread

Note:

We wrap it with a mutex exactly because of the above case of forceful shutdown

Stops the jobs queue, triggers shutdown on all the executors (sync), commits offsets and stops kafka client.



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/karafka/connection/listener.rb', line 124

def shutdown
  @mutex.synchronize do
    return if stopped?
    # Nothing to clear if it was not even running
    return stopped! if pending?

    @executors.clear
    @coordinators.reset
    @client.stop

    stopped!
  end
end

#start!Object

We overwrite the state #start because on start we need to also start running listener in the async thread. While other state transitions happen automatically and status state change is enough, here we need to run the background threads



106
107
108
109
110
111
112
113
114
115
# File 'lib/karafka/connection/listener.rb', line 106

def start!
  if stopped?
    @client.reset
    @status.reset!
  end

  @status.start!

  async_call("karafka.listener##{@subscription_group.id}")
end