Class: Karafka::Connection::PausesManager
- Inherits:
-
Object
- Object
- Karafka::Connection::PausesManager
- Defined in:
- lib/karafka/connection/pauses_manager.rb
Overview
Partitions pauses management abstraction layer. It aggregates all the pauses for all the partitions that we’re working with.
Instance Method Summary collapse
-
#fetch(topic, partition) ⇒ Karafka::TimeTrackers::Pause
Creates or fetches pause tracker of a given topic partition.
-
#initialize ⇒ Karafka::Connection::PausesManager
constructor
Pauses manager.
-
#resume {|topic, partition| ... } ⇒ Object
Resumes processing of partitions for which pause time has ended.
Constructor Details
#initialize ⇒ Karafka::Connection::PausesManager
Returns pauses manager.
9 10 11 12 13 |
# File 'lib/karafka/connection/pauses_manager.rb', line 9 def initialize @pauses = Hash.new do |h, k| h[k] = {} end end |
Instance Method Details
#fetch(topic, partition) ⇒ Karafka::TimeTrackers::Pause
Creates or fetches pause tracker of a given topic partition.
20 21 22 23 24 25 26 |
# File 'lib/karafka/connection/pauses_manager.rb', line 20 def fetch(topic, partition) @pauses[topic][partition] ||= TimeTrackers::Pause.new( timeout: topic.pause_timeout, max_timeout: topic.pause_max_timeout, exponential_backoff: topic.pause_with_exponential_backoff ) end |
#resume {|topic, partition| ... } ⇒ Object
Resumes processing of partitions for which pause time has ended.
32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/karafka/connection/pauses_manager.rb', line 32 def resume @pauses.each do |topic, partitions| partitions.each do |partition, pause| next unless pause.paused? next unless pause.expired? pause.resume yield(topic, partition) end end end |