Class: Karafka::Processing::ExecutorsBuffer
- Inherits:
-
Object
- Object
- Karafka::Processing::ExecutorsBuffer
- Defined in:
- lib/karafka/processing/executors_buffer.rb
Overview
Buffer for executors of a given subscription group. It wraps around the concept of building and caching them, so we can re-use them instead of creating new each time.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears the executors buffer.
-
#each {|karafka, partition, given| ... } ⇒ Object
Iterates over all available executors and yields them together with topic and partition info.
-
#find_all(topic, partition) ⇒ Array<Executor, Pro::Processing::Executor>
Finds all the executors available for a given topic partition.
-
#find_all_or_create(topic, partition, coordinator) ⇒ Array<Executor, Pro::Processing::Executor>
Finds all existing executors for given topic partition or creates one for it.
-
#find_or_create(topic, partition, parallel_key, coordinator) ⇒ Executor, Pro::Processing::Executor
Finds or creates an executor based on the provided details.
- #initialize(client, subscription_group) ⇒ ExecutorsBuffer constructor
-
#revoke(topic, partition) ⇒ Object
Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages.
Constructor Details
#initialize(client, subscription_group) ⇒ ExecutorsBuffer
15 16 17 18 19 20 |
# File 'lib/karafka/processing/executors_buffer.rb', line 15 def initialize(client, subscription_group) @subscription_group = subscription_group @client = client # We need two layers here to keep track of topics, partitions and processing groups @buffer = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } } end |
Instance Method Details
#clear ⇒ Object
Clears the executors buffer. Useful for critical errors recovery.
85 86 87 |
# File 'lib/karafka/processing/executors_buffer.rb', line 85 def clear @buffer.clear end |
#each {|karafka, partition, given| ... } ⇒ Object
Iterates over all available executors and yields them together with topic and partition info
74 75 76 77 78 79 80 81 82 |
# File 'lib/karafka/processing/executors_buffer.rb', line 74 def each @buffer.each_value do |partitions| partitions.each_value do |executors| executors.each_value do |executor| yield(executor) end end end end |
#find_all(topic, partition) ⇒ Array<Executor, Pro::Processing::Executor>
Finds all the executors available for a given topic partition
65 66 67 |
# File 'lib/karafka/processing/executors_buffer.rb', line 65 def find_all(topic, partition) @buffer[topic][partition].values end |
#find_all_or_create(topic, partition, coordinator) ⇒ Array<Executor, Pro::Processing::Executor>
Finds all existing executors for given topic partition or creates one for it
42 43 44 45 46 47 48 |
# File 'lib/karafka/processing/executors_buffer.rb', line 42 def find_all_or_create(topic, partition, coordinator) existing = find_all(topic, partition) return existing unless existing.empty? [find_or_create(topic, partition, 0, coordinator)] end |
#find_or_create(topic, partition, parallel_key, coordinator) ⇒ Executor, Pro::Processing::Executor
Finds or creates an executor based on the provided details
29 30 31 32 33 34 35 |
# File 'lib/karafka/processing/executors_buffer.rb', line 29 def find_or_create(topic, partition, parallel_key, coordinator) @buffer[topic][partition][parallel_key] ||= executor_class.new( @subscription_group.id, @client, coordinator ) end |
#revoke(topic, partition) ⇒ Object
Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages
55 56 57 |
# File 'lib/karafka/processing/executors_buffer.rb', line 55 def revoke(topic, partition) @buffer[topic][partition].clear end |