Class: Karafka::Processing::ExecutorsBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/executors_buffer.rb

Overview

Buffer for executors of a given subscription group. It wraps around the concept of building and caching them, so we can re-use them instead of creating new each time.

Instance Method Summary collapse

Constructor Details

#initialize(client, subscription_group) ⇒ ExecutorsBuffer

Parameters:



15
16
17
18
19
20
# File 'lib/karafka/processing/executors_buffer.rb', line 15

def initialize(client, subscription_group)
  @subscription_group = subscription_group
  @client = client
  # We need two layers here to keep track of topics, partitions and processing groups
  @buffer = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } }
end

Instance Method Details

#clearObject

Clears the executors buffer. Useful for critical errors recovery.



85
86
87
# File 'lib/karafka/processing/executors_buffer.rb', line 85

def clear
  @buffer.clear
end

#each {|karafka, partition, given| ... } ⇒ Object

Iterates over all available executors and yields them together with topic and partition info

Yield Parameters:



74
75
76
77
78
79
80
81
82
# File 'lib/karafka/processing/executors_buffer.rb', line 74

def each
  @buffer.each_value do |partitions|
    partitions.each_value do |executors|
      executors.each_value do |executor|
        yield(executor)
      end
    end
  end
end

#find_all(topic, partition) ⇒ Array<Executor, Pro::Processing::Executor>

Finds all the executors available for a given topic partition

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number

Returns:



65
66
67
# File 'lib/karafka/processing/executors_buffer.rb', line 65

def find_all(topic, partition)
  @buffer[topic][partition].values
end

#find_all_or_create(topic, partition, coordinator) ⇒ Array<Executor, Pro::Processing::Executor>

Finds all existing executors for given topic partition or creates one for it

Parameters:

Returns:



42
43
44
45
46
47
48
# File 'lib/karafka/processing/executors_buffer.rb', line 42

def find_all_or_create(topic, partition, coordinator)
  existing = find_all(topic, partition)

  return existing unless existing.empty?

  [find_or_create(topic, partition, 0, coordinator)]
end

#find_or_create(topic, partition, parallel_key, coordinator) ⇒ Executor, Pro::Processing::Executor

Finds or creates an executor based on the provided details

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number

  • parallel_key (String)

    parallel group key

  • coordinator (Karafka::Processing::Coordinator)

Returns:



29
30
31
32
33
34
35
# File 'lib/karafka/processing/executors_buffer.rb', line 29

def find_or_create(topic, partition, parallel_key, coordinator)
  @buffer[topic][partition][parallel_key] ||= executor_class.new(
    @subscription_group.id,
    @client,
    coordinator
  )
end

#revoke(topic, partition) ⇒ Object

Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number



55
56
57
# File 'lib/karafka/processing/executors_buffer.rb', line 55

def revoke(topic, partition)
  @buffer[topic][partition].clear
end