Class: Karafka::Connection::RebalanceManager
- Inherits:
-
Object
- Object
- Karafka::Connection::RebalanceManager
- Defined in:
- lib/karafka/connection/rebalance_manager.rb
Overview
Since this does not happen really often, we try to stick with same objects for the empty states most of the time, so we don’t create many objects during the manager life
Internally in the rebalance manager we have a notion of lost partitions. Partitions that are lost, are those that got revoked but did not get re-assigned back. We do not expose this concept outside and we normalize to have them revoked, as it is irrelevant from the rest of the code perspective as only those that are lost are truly revoked.
For cooperative-sticky #assigned_partitions
holds only the recently assigned partitions, not all the partitions that are owned
We have to have the subscription_group
reference because we have a global pipeline for notifications and we need to make sure we track changes only for things that are of relevance to our subscription group
Manager for tracking changes in the partitions assignment after the assignment is done.
We need tracking of those to clean up consumers that will no longer process given partitions as they were taken away.
Instance Attribute Summary collapse
-
#assigned_partitions ⇒ Object
readonly
Returns the value of attribute assigned_partitions.
-
#revoked_partitions ⇒ Object
readonly
Returns the value of attribute revoked_partitions.
Instance Method Summary collapse
-
#active? ⇒ Boolean
True if there was at least one rebalance.
-
#changed? ⇒ Boolean
Indicates a state change in the partitions assignment.
-
#clear ⇒ Object
Resets the rebalance manager state This needs to be done before each polling loop as during the polling, the state may be changed.
- #initialize(subscription_group_id) ⇒ RebalanceManager constructor
-
#lost_partitions ⇒ Object
We consider as lost only partitions that were taken away and not re-assigned back to us.
-
#on_rebalance_partitions_assigned(event) ⇒ Object
Callback that kicks in inside of rdkafka, when new partitions were assigned.
-
#on_rebalance_partitions_revoked(event) ⇒ Object
Callback that kicks in inside of rdkafka, when partitions were revoked.
Constructor Details
#initialize(subscription_group_id) ⇒ RebalanceManager
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 34 def initialize(subscription_group_id) @assigned_partitions = {} @revoked_partitions = {} @changed = false @active = false @subscription_group_id = subscription_group_id # Connects itself to the instrumentation pipeline so rebalances can be tracked ::Karafka.monitor.subscribe(self) end |
Instance Attribute Details
#assigned_partitions ⇒ Object (readonly)
Returns the value of attribute assigned_partitions.
28 29 30 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 28 def assigned_partitions @assigned_partitions end |
#revoked_partitions ⇒ Object (readonly)
Returns the value of attribute revoked_partitions.
28 29 30 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 28 def revoked_partitions @revoked_partitions end |
Instance Method Details
#active? ⇒ Boolean
This method is needed to make sure that when using cooperative-sticky, we do not close until first rebalance. Otherwise librdkafka may crash.
Returns true if there was at least one rebalance.
63 64 65 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 63 def active? @active end |
#changed? ⇒ Boolean
Returns indicates a state change in the partitions assignment.
55 56 57 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 55 def changed? @changed end |
#clear ⇒ Object
Resets the rebalance manager state This needs to be done before each polling loop as during the polling, the state may be changed
48 49 50 51 52 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 48 def clear @assigned_partitions.clear @revoked_partitions.clear @changed = false end |
#lost_partitions ⇒ Object
We consider as lost only partitions that were taken away and not re-assigned back to us
68 69 70 71 72 73 74 75 76 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 68 def lost_partitions lost_partitions = {} revoked_partitions.each do |topic, partitions| lost_partitions[topic] = partitions - assigned_partitions.fetch(topic, EMPTY_ARRAY) end lost_partitions end |
#on_rebalance_partitions_assigned(event) ⇒ Object
Callback that kicks in inside of rdkafka, when new partitions were assigned.
82 83 84 85 86 87 88 89 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 82 def on_rebalance_partitions_assigned(event) # Apply changes only for our subscription group return unless event[:subscription_group_id] == @subscription_group_id @active = true @assigned_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) } @changed = true end |
#on_rebalance_partitions_revoked(event) ⇒ Object
Callback that kicks in inside of rdkafka, when partitions were revoked.
95 96 97 98 99 100 101 102 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 95 def on_rebalance_partitions_revoked(event) # Apply changes only for our subscription group return unless event[:subscription_group_id] == @subscription_group_id @active = true @revoked_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) } @changed = true end |