Class: Karafka::Connection::PausesManager

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/pauses_manager.rb

Overview

Partitions pauses management abstraction layer. It aggregates all the pauses for all the partitions that we’re working with.

Instance Method Summary collapse

Constructor Details

#initializeKarafka::Connection::PausesManager

Returns pauses manager.



9
10
11
12
13
# File 'lib/karafka/connection/pauses_manager.rb', line 9

def initialize
  @pauses = Hash.new do |h, k|
    h[k] = {}
  end
end

Instance Method Details

#fetch(topic, partition) ⇒ Karafka::TimeTrackers::Pause

Creates or fetches pause tracker of a given topic partition.

Parameters:

Returns:



20
21
22
23
24
25
26
# File 'lib/karafka/connection/pauses_manager.rb', line 20

def fetch(topic, partition)
  @pauses[topic][partition] ||= TimeTrackers::Pause.new(
    timeout: topic.pause_timeout,
    max_timeout: topic.pause_max_timeout,
    exponential_backoff: topic.pause_with_exponential_backoff
  )
end

#resume {|topic, partition| ... } ⇒ Object

Resumes processing of partitions for which pause time has ended.

Yield Parameters:



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/karafka/connection/pauses_manager.rb', line 32

def resume
  @pauses.each do |topic, partitions|
    partitions.each do |partition, pause|
      next unless pause.paused?
      next unless pause.expired?

      pause.resume

      yield(topic, partition)
    end
  end
end