Class: Karafka::Connection::Listener

Inherits:
Object
  • Object
show all
Includes:
Helpers::Async
Defined in:
lib/karafka/connection/listener.rb

Overview

A single listener that listens to incoming messages from a single subscription group. It polls the messages and then enqueues jobs. It also takes care of potential recovery from critical errors by restarting everything in a safe manner.

This is the heart of the consumption process.

It provides async API for managing, so all status changes are expected to be async.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::Async

#alive?, #async_call, included

Constructor Details

#initialize(subscription_group, jobs_queue, scheduler) ⇒ Karafka::Connection::Listener

Returns listener instance.

Parameters:



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/karafka/connection/listener.rb', line 44

def initialize(subscription_group, jobs_queue, scheduler)
  @id = SecureRandom.hex(6)
  @subscription_group = subscription_group
  @jobs_queue = jobs_queue
  @coordinators = Processing::CoordinatorsBuffer.new(subscription_group.topics)
  @client = Client.new(@subscription_group, -> { running? })
  @executors = Processing::ExecutorsBuffer.new(@client, subscription_group)
  @partitioner = partitioner_class.new(subscription_group)
  @scheduler = scheduler
  @events_poller = Helpers::IntervalRunner.new { @client.events_poll }
  # We keep one buffer for messages to preserve memory and not allocate extra objects
  # We can do this that way because we always first schedule jobs using messages before we
  # fetch another batch.
  @messages_buffer = MessagesBuffer.new(subscription_group)
  @mutex = Mutex.new
  @status = Status.new

  @jobs_queue.register(@subscription_group.id)

  # This makes sure that even if we tick more often than the interval time due to frequent
  # unlocks from short-lived jobs or async queues synchronization, events handling and jobs
  # scheduling still happens with the expected frequency
  @interval_runner = Helpers::IntervalRunner.new do
    @events_poller.call
    @scheduler.on_manage
  end
end

Instance Attribute Details

#coordinatorsProcessing::CoordinatorsBuffer (readonly)

Returns coordinator buffers that can be used directly in advanced cases of changes to the polling flow (like triggered seek back without messages ahead in the topic).

Returns:

  • (Processing::CoordinatorsBuffer)

    coordinator buffers that can be used directly in advanced cases of changes to the polling flow (like triggered seek back without messages ahead in the topic)



32
33
34
# File 'lib/karafka/connection/listener.rb', line 32

def coordinators
  @coordinators
end

#idString (readonly)

Can be useful for logging

Returns:

  • (String)

    id of this listener



24
25
26
# File 'lib/karafka/connection/listener.rb', line 24

def id
  @id
end

#subscription_groupKarafka::Routing::SubscriptionGroup (readonly)

Returns subscription group that this listener handles.

Returns:



27
28
29
# File 'lib/karafka/connection/listener.rb', line 27

def subscription_group
  @subscription_group
end

Instance Method Details

#active?Boolean

Returns is this listener active (not stopped and not pending).

Returns:

  • (Boolean)

    is this listener active (not stopped and not pending)



108
109
110
# File 'lib/karafka/connection/listener.rb', line 108

def active?
  @status.active?
end

#callObject

Note:

Prefetch callbacks can be used to seek offset or do other things before we actually start consuming data

Runs the main listener fetch loop.



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/karafka/connection/listener.rb', line 76

def call
  Karafka.monitor.instrument(
    'connection.listener.before_fetch_loop',
    caller: self,
    client: @client,
    subscription_group: @subscription_group
  )

  fetch_loop

  Karafka.monitor.instrument(
    'connection.listener.after_fetch_loop',
    caller: self,
    client: @client,
    subscription_group: @subscription_group
  )
end

#shutdownObject

Note:

This method is not private despite being part of the fetch loop because in case of a forceful shutdown, it may be invoked from a separate thread

Note:

We wrap it with a mutex exactly because of the above case of forceful shutdown

Stops the jobs queue, triggers shutdown on all the executors (sync), commits offsets and stops kafka client.



136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/karafka/connection/listener.rb', line 136

def shutdown
  @mutex.synchronize do
    return if stopped?
    # Nothing to clear if it was not even running
    return stopped! if pending?

    @executors.clear
    @coordinators.reset
    @client.stop

    stopped!
  end
end

#start!Object

We overwrite the state #start because on start we need to also start running listener in the async thread. While other state transitions happen automatically and status state change is enough, here we need to run the background threads



115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/karafka/connection/listener.rb', line 115

def start!
  if stopped?
    @client.reset
    @status.reset!
  end

  @status.start!

  async_call(
    "karafka.listener##{@subscription_group.id}",
    listener_thread_priority
  )
end