Class: Karafka::Instrumentation::AssignmentsTracker
- Inherits:
-
Object
- Object
- Karafka::Instrumentation::AssignmentsTracker
- Includes:
- Singleton
- Defined in:
- lib/karafka/instrumentation/assignments_tracker.rb
Overview
Keeps track of active assignments and materializes them by returning the routing topics with appropriate partitions that are assigned at a given moment
It is auto-subscribed as part of Karafka itself.
It is not heavy from the computational point of view, as it only operates during rebalances.
We keep assignments as flat topics structure because we can go from topics to both subscription and consumer groups if needed.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears all the assignments.
-
#current ⇒ Hash{Karafka::Routing::Topic => Array<Integer>}
Returns all the active/current assignments of this given process.
-
#initialize ⇒ AssignmentsTracker
constructor
Initializes the assignments tracker with empty assignments.
-
#inspect ⇒ String
Thread-safe and lock-safe inspect implementation.
-
#on_client_events_poll(event) ⇒ Object
Handles events_poll notification to detect assignment loss This is called regularly (every tick_interval) so we check if assignment was lost.
-
#on_client_reset(event) ⇒ Object
When client is under reset due to critical issues, remove all of its assignments as we will get a new set of assignments.
-
#on_rebalance_partitions_assigned(event) ⇒ Object
Adds partitions to the current assignments hash.
-
#on_rebalance_partitions_revoked(event) ⇒ Object
Removes partitions from the current assignments hash.
Constructor Details
#initialize ⇒ AssignmentsTracker
Initializes the assignments tracker with empty assignments
18 19 20 21 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 18 def initialize @mutex = Mutex.new @assignments = Hash.new { |hash, key| hash[key] = [] } end |
Instance Method Details
#clear ⇒ Object
Clears all the assignments
48 49 50 51 52 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 48 def clear @mutex.synchronize do @assignments.clear end end |
#current ⇒ Hash{Karafka::Routing::Topic => Array<Integer>}
Keep in mind, that those assignments can change any time, especially when working with multiple consumer groups or subscription groups.
We return a copy because we modify internals and we do not want user to tamper with the data accidentally
Returns all the active/current assignments of this given process
32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 32 def current assignments = {} # Since the `@assignments` state can change during a rebalance, if we would iterate over # it exactly during state change, we would end up with the following error: # RuntimeError: can't add a new key into hash during iteration @mutex.synchronize do @assignments.each do |topic, partitions| assignments[topic] = partitions.dup.freeze end end assignments.freeze end |
#inspect ⇒ String
Returns thread-safe and lock-safe inspect implementation.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 55 def inspect info = if @mutex.try_lock begin assignments = @assignments.dup.transform_keys(&:name).inspect "assignments=#{assignments}" ensure @mutex.unlock end else 'busy' end "#<#{self.class.name} #{info}>" end |
#on_client_events_poll(event) ⇒ Object
We can run the #assignment_lost? on each events poll because they happen once every
5 seconds during processing plus prior to each messages poll. It takes
0.6 microseconds per call.
Handles events_poll notification to detect assignment loss This is called regularly (every tick_interval) so we check if assignment was lost
90 91 92 93 94 95 96 97 98 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 90 def on_client_events_poll(event) client = event[:caller] # Only clear assignments if they were actually lost return unless client.assignment_lost? # Cleaning happens the same way as with the consumer reset on_client_reset(event) end |
#on_client_reset(event) ⇒ Object
When client is under reset due to critical issues, remove all of its assignments as we will get a new set of assignments
73 74 75 76 77 78 79 80 81 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 73 def on_client_reset(event) sg = event[:subscription_group] @mutex.synchronize do @assignments.delete_if do |topic, _partitions| topic.subscription_group.id == sg.id end end end |
#on_rebalance_partitions_assigned(event) ⇒ Object
Adds partitions to the current assignments hash
121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 121 def on_rebalance_partitions_assigned(event) sg = event[:subscription_group] @mutex.synchronize do event[:tpl].to_h.each do |topic, partitions| topic = sg.topics.find(topic) @assignments[topic] += partitions.map(&:partition) @assignments[topic].sort! end end end |
#on_rebalance_partitions_revoked(event) ⇒ Object
Removes partitions from the current assignments hash
103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 103 def on_rebalance_partitions_revoked(event) sg = event[:subscription_group] @mutex.synchronize do event[:tpl].to_h.each do |topic, partitions| topic = sg.topics.find(topic) @assignments[topic] -= partitions.map(&:partition) @assignments[topic].sort! # Remove completely topics for which we do not have any assignments left @assignments.delete_if { |_topic, cur_partitions| cur_partitions.empty? } end end end |