Class: Karafka::Connection::RebalanceManager
- Inherits:
-
Object
- Object
- Karafka::Connection::RebalanceManager
- Defined in:
- lib/karafka/connection/rebalance_manager.rb
Overview
Since this does not happen really often, we try to stick with same objects for the empty states most of the time, so we don’t create many objects during the manager life
Internally in the rebalance manager we have a notion of lost partitions. Partitions that are lost, are those that got revoked but did not get re-assigned back. We do not expose this concept outside and we normalize to have them revoked, as it is irrelevant from the rest of the code perspective as only those that are lost are truly revoked.
For cooperative-sticky #assigned_partitions
holds only the recently assigned partitions, not all the partitions that are owned
We have to have the subscription_group
reference because we have a global pipeline for notifications and we need to make sure we track changes only for things that are of relevance to our subscription group
Manager for tracking changes in the partitions assignment after the assignment is done and for ensuring, that proper buffer related operations that may be impacted by the rebalance state are applied.
We need tracking of those to clean up consumers that will no longer process given partitions as they were taken away.
Instance Attribute Summary collapse
-
#assigned_partitions ⇒ Object
readonly
Returns the value of attribute assigned_partitions.
-
#revoked_partitions ⇒ Object
readonly
Returns the value of attribute revoked_partitions.
Instance Method Summary collapse
-
#active? ⇒ Boolean
True if there was at least one rebalance.
-
#changed? ⇒ Boolean
Indicates a state change in the partitions assignment.
-
#clear ⇒ Object
Resets the rebalance manager state This needs to be done before each polling loop as during the polling, the state may be changed.
- #initialize(subscription_group_id, buffer) ⇒ RebalanceManager constructor
-
#on_rebalance_partitions_assigned(event) ⇒ Object
Callback that kicks in inside of rdkafka, when new partitions were assigned.
-
#on_rebalance_partitions_revoked(event) ⇒ Object
Callback that kicks in inside of rdkafka, when partitions were revoked.
Constructor Details
#initialize(subscription_group_id, buffer) ⇒ RebalanceManager
37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 37 def initialize(subscription_group_id, buffer) @assigned_partitions = {} @revoked_partitions = {} @changed = false @active = false @subscription_group_id = subscription_group_id @buffer = buffer # Connects itself to the instrumentation pipeline so rebalances can be tracked ::Karafka.monitor.subscribe(self) end |
Instance Attribute Details
#assigned_partitions ⇒ Object (readonly)
Returns the value of attribute assigned_partitions.
30 31 32 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 30 def assigned_partitions @assigned_partitions end |
#revoked_partitions ⇒ Object (readonly)
Returns the value of attribute revoked_partitions.
30 31 32 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 30 def revoked_partitions @revoked_partitions end |
Instance Method Details
#active? ⇒ Boolean
This method is needed to make sure that when using cooperative-sticky, we do not close until first rebalance. Otherwise librdkafka may crash.
Returns true if there was at least one rebalance.
67 68 69 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 67 def active? @active end |
#changed? ⇒ Boolean
Returns indicates a state change in the partitions assignment.
59 60 61 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 59 def changed? @changed end |
#clear ⇒ Object
Resets the rebalance manager state This needs to be done before each polling loop as during the polling, the state may be changed
52 53 54 55 56 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 52 def clear @assigned_partitions.clear @revoked_partitions.clear @changed = false end |
#on_rebalance_partitions_assigned(event) ⇒ Object
Callback that kicks in inside of rdkafka, when new partitions were assigned.
75 76 77 78 79 80 81 82 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 75 def on_rebalance_partitions_assigned(event) # Apply changes only for our subscription group return unless event[:subscription_group_id] == @subscription_group_id @active = true @assigned_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) } @changed = true end |
#on_rebalance_partitions_revoked(event) ⇒ Object
Callback that kicks in inside of rdkafka, when partitions were revoked.
88 89 90 91 92 93 94 95 96 97 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 88 def on_rebalance_partitions_revoked(event) # Apply changes only for our subscription group return unless event[:subscription_group_id] == @subscription_group_id @active = true @revoked_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) } @changed = true end |