Class: Karafka::Processing::CoordinatorsBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/coordinators_buffer.rb

Overview

Note:

This buffer operates only from the listener loop, thus we do not have to make it thread-safe.

Coordinators builder used to build coordinators per topic partition

It provides direct pauses access for revocation

Instance Method Summary collapse

Constructor Details

#initialize(topics) ⇒ CoordinatorsBuffer

Returns a new instance of CoordinatorsBuffer.

Parameters:



17
18
19
20
21
# File 'lib/karafka/processing/coordinators_buffer.rb', line 17

def initialize(topics)
  @pauses_manager = Connection::PausesManager.new
  @coordinators = Hash.new { |h, k| h[k] = {} }
  @topics = topics
end

Instance Method Details

#find_or_create(topic_name, partition) ⇒ Object

Parameters:

  • topic_name (String)

    topic name

  • partition (Integer)

    partition number



25
26
27
28
29
30
31
32
33
34
35
# File 'lib/karafka/processing/coordinators_buffer.rb', line 25

def find_or_create(topic_name, partition)
  @coordinators[topic_name][partition] ||= begin
    routing_topic = @topics.find(topic_name)

    coordinator_class.new(
      routing_topic,
      partition,
      @pauses_manager.fetch(routing_topic, partition)
    )
  end
end

#resetObject

Clears coordinators and re-created the pauses manager This should be used only for critical errors recovery



59
60
61
62
# File 'lib/karafka/processing/coordinators_buffer.rb', line 59

def reset
  @pauses_manager = Connection::PausesManager.new
  @coordinators.clear
end

#resume(&block) {|topic, partition| ... } ⇒ Object

Resumes processing of partitions for which pause time has ended.

Parameters:

  • block

    we want to run for resumed topic partitions

Yield Parameters:

  • topic (String)

    name

  • partition (Integer)

    number



41
42
43
# File 'lib/karafka/processing/coordinators_buffer.rb', line 41

def resume(&block)
  @pauses_manager.resume(&block)
end

#revoke(topic_name, partition) ⇒ Object

Parameters:

  • topic_name (String)

    topic name

  • partition (Integer)

    partition number



47
48
49
50
51
52
53
54
55
# File 'lib/karafka/processing/coordinators_buffer.rb', line 47

def revoke(topic_name, partition)
  return unless @coordinators[topic_name].key?(partition)

  # The fact that we delete here does not change the fact that the executor still holds the
  # reference to this coordinator. We delete it here, as we will no longer process any
  # new stuff with it and we may need a new coordinator if we regain this partition, but the
  # coordinator may still be in use
  @coordinators[topic_name].delete(partition).revoke
end