Class: Karafka::Instrumentation::AssignmentsTracker
- Inherits:
-
Object
- Object
- Karafka::Instrumentation::AssignmentsTracker
- Includes:
- Singleton
- Defined in:
- lib/karafka/instrumentation/assignments_tracker.rb
Overview
Keeps track of active assignments and materializes them by returning the routing topics with appropriate partitions that are assigned at a given moment
It is auto-subscribed as part of Karafka itself.
It is not heavy from the computational point of view, as it only operates during rebalances.
We keep assignments as flat topics structure because we can go from topics to both subscription and consumer groups if needed.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears all the assignments.
-
#current ⇒ Hash<Karafka::Routing::Topic, Array<Integer>>
Returns all the active/current assignments of this given process.
-
#initialize ⇒ AssignmentsTracker
constructor
A new instance of AssignmentsTracker.
-
#on_client_reset(event) ⇒ Object
When client is under reset due to critical issues, remove all of its assignments as we will get a new set of assignments.
-
#on_rebalance_partitions_assigned(event) ⇒ Object
Adds partitions to the current assignments hash.
-
#on_rebalance_partitions_revoked(event) ⇒ Object
Removes partitions from the current assignments hash.
Constructor Details
#initialize ⇒ AssignmentsTracker
Returns a new instance of AssignmentsTracker.
17 18 19 20 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 17 def initialize @mutex = Mutex.new @assignments = Hash.new { |hash, key| hash[key] = [] } end |
Instance Method Details
#clear ⇒ Object
Clears all the assignments
47 48 49 50 51 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 47 def clear @mutex.synchronize do @assignments.clear end end |
#current ⇒ Hash<Karafka::Routing::Topic, Array<Integer>>
Keep in mind, that those assignments can change any time, especially when working with multiple consumer groups or subscription groups.
We return a copy because we modify internals and we do not want user to tamper with the data accidentally
Returns all the active/current assignments of this given process
31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 31 def current assignments = {} # Since the `@assignments` state can change during a rebalance, if we would iterate over # it exactly during state change, we would end up with the following error: # RuntimeError: can't add a new key into hash during iteration @mutex.synchronize do @assignments.each do |topic, partitions| assignments[topic] = partitions.dup.freeze end end assignments.freeze end |
#on_client_reset(event) ⇒ Object
When client is under reset due to critical issues, remove all of its assignments as we will get a new set of assignments
56 57 58 59 60 61 62 63 64 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 56 def on_client_reset(event) sg = event[:subscription_group] @mutex.synchronize do @assignments.delete_if do |topic, _partitions| topic.subscription_group.id == sg.id end end end |
#on_rebalance_partitions_assigned(event) ⇒ Object
Adds partitions to the current assignments hash
87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 87 def on_rebalance_partitions_assigned(event) sg = event[:subscription_group] @mutex.synchronize do event[:tpl].to_h.each do |topic, partitions| topic = sg.topics.find(topic) @assignments[topic] += partitions.map(&:partition) @assignments[topic].sort! end end end |
#on_rebalance_partitions_revoked(event) ⇒ Object
Removes partitions from the current assignments hash
69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 69 def on_rebalance_partitions_revoked(event) sg = event[:subscription_group] @mutex.synchronize do event[:tpl].to_h.each do |topic, partitions| topic = sg.topics.find(topic) @assignments[topic] -= partitions.map(&:partition) @assignments[topic].sort! # Remove completely topics for which we do not have any assignments left @assignments.delete_if { |_topic, cur_partitions| cur_partitions.empty? } end end end |