Class: Karafka::Instrumentation::AssignmentsTracker
- Inherits:
-
Object
- Object
- Karafka::Instrumentation::AssignmentsTracker
- Includes:
- Singleton
- Defined in:
- lib/karafka/instrumentation/assignments_tracker.rb
Overview
Keeps track of active assignments and materializes them by returning the routing topics with appropriate partitions that are assigned at a given moment
It is auto-subscribed as part of Karafka itself.
It is not heavy from the computational point of view, as it only operates during rebalances.
We keep assignments as flat topics structure because we can go from topics to both subscription and consumer groups if needed.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears all the assignments.
-
#current ⇒ Hash<Karafka::Routing::Topic, Array<Integer>>
Returns all the active/current assignments of this given process.
-
#initialize ⇒ AssignmentsTracker
constructor
A new instance of AssignmentsTracker.
-
#on_client_reset(event) ⇒ Object
When client is under reset due to critical issues, remove all of its assignments as we will get a new set of assignments.
-
#on_rebalance_partitions_assigned(event) ⇒ Object
Adds partitions to the current assignments hash.
-
#on_rebalance_partitions_revoked(event) ⇒ Object
Removes partitions from the current assignments hash.
Constructor Details
#initialize ⇒ AssignmentsTracker
Returns a new instance of AssignmentsTracker.
17 18 19 20 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 17 def initialize @mutex = Mutex.new @assignments = Hash.new { |hash, key| hash[key] = [] } end |
Instance Method Details
#clear ⇒ Object
Clears all the assignments
42 43 44 45 46 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 42 def clear @mutex.synchronize do @assignments.clear end end |
#current ⇒ Hash<Karafka::Routing::Topic, Array<Integer>>
Keep in mind, that those assignments can change any time, especially when working with multiple consumer groups or subscription groups.
We return a copy because we modify internals and we do not want user to tamper with the data accidentally
Returns all the active/current assignments of this given process
31 32 33 34 35 36 37 38 39 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 31 def current assignments = {} @assignments.each do |topic, partitions| assignments[topic] = partitions.dup.freeze end assignments.freeze end |
#on_client_reset(event) ⇒ Object
When client is under reset due to critical issues, remove all of its assignments as we will get a new set of assignments
51 52 53 54 55 56 57 58 59 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 51 def on_client_reset(event) sg = event[:subscription_group] @mutex.synchronize do @assignments.delete_if do |topic, _partitions| topic.subscription_group.id == sg.id end end end |
#on_rebalance_partitions_assigned(event) ⇒ Object
Adds partitions to the current assignments hash
82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 82 def on_rebalance_partitions_assigned(event) sg = event[:subscription_group] @mutex.synchronize do event[:tpl].to_h.each do |topic, partitions| topic = sg.topics.find(topic) @assignments[topic] += partitions.map(&:partition) @assignments[topic].sort! end end end |
#on_rebalance_partitions_revoked(event) ⇒ Object
Removes partitions from the current assignments hash
64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 64 def on_rebalance_partitions_revoked(event) sg = event[:subscription_group] @mutex.synchronize do event[:tpl].to_h.each do |topic, partitions| topic = sg.topics.find(topic) @assignments[topic] -= partitions.map(&:partition) @assignments[topic].sort! # Remove completely topics for which we do not have any assignments left @assignments.delete_if { |_topic, cur_partitions| cur_partitions.empty? } end end end |