Class: Karafka::Connection::RebalanceManager

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/rebalance_manager.rb

Overview

Note:

Since this does not happen really often, we try to stick with same objects for the empty states most of the time, so we don’t create many objects during the manager life

Note:

Internally in the rebalance manager we have a notion of lost partitions. Partitions that are lost, are those that got revoked but did not get re-assigned back. We do not expose this concept outside and we normalize to have them revoked, as it is irrelevant from the rest of the code perspective as only those that are lost are truly revoked.

Note:

For cooperative-sticky #assigned_partitions holds only the recently assigned partitions, not all the partitions that are owned

Note:

We have to have the subscription_group reference because we have a global pipeline for notifications and we need to make sure we track changes only for things that are of relevance to our subscription group

Manager for tracking changes in the partitions assignment after the assignment is done and for ensuring, that proper buffer related operations that may be impacted by the rebalance state are applied.

We need tracking of those to clean up consumers that will no longer process given partitions as they were taken away.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription_group_id, buffer) ⇒ RebalanceManager

Parameters:



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/karafka/connection/rebalance_manager.rb', line 37

def initialize(subscription_group_id, buffer)
  @assigned_partitions = {}
  @revoked_partitions = {}
  @changed = false
  @active = false
  @subscription_group_id = subscription_group_id
  @buffer = buffer

  # Connects itself to the instrumentation pipeline so rebalances can be tracked
  ::Karafka.monitor.subscribe(self)
end

Instance Attribute Details

#assigned_partitionsObject (readonly)

Returns the value of attribute assigned_partitions.



30
31
32
# File 'lib/karafka/connection/rebalance_manager.rb', line 30

def assigned_partitions
  @assigned_partitions
end

#revoked_partitionsObject (readonly)

Returns the value of attribute revoked_partitions.



30
31
32
# File 'lib/karafka/connection/rebalance_manager.rb', line 30

def revoked_partitions
  @revoked_partitions
end

Instance Method Details

#active?Boolean

Note:

This method is needed to make sure that when using cooperative-sticky, we do not close until first rebalance. Otherwise librdkafka may crash.

Returns true if there was at least one rebalance.

Returns:

  • (Boolean)

    true if there was at least one rebalance

See Also:



67
68
69
# File 'lib/karafka/connection/rebalance_manager.rb', line 67

def active?
  @active
end

#changed?Boolean

Returns indicates a state change in the partitions assignment.

Returns:

  • (Boolean)

    indicates a state change in the partitions assignment



59
60
61
# File 'lib/karafka/connection/rebalance_manager.rb', line 59

def changed?
  @changed
end

#clearObject

Resets the rebalance manager state This needs to be done before each polling loop as during the polling, the state may be changed



52
53
54
55
56
# File 'lib/karafka/connection/rebalance_manager.rb', line 52

def clear
  @assigned_partitions.clear
  @revoked_partitions.clear
  @changed = false
end

#on_rebalance_partitions_assigned(event) ⇒ Object

Callback that kicks in inside of rdkafka, when new partitions were assigned.

Parameters:

  • event (Karafka::Core::Monitoring::Event)


75
76
77
78
79
80
81
82
# File 'lib/karafka/connection/rebalance_manager.rb', line 75

def on_rebalance_partitions_assigned(event)
  # Apply changes only for our subscription group
  return unless event[:subscription_group_id] == @subscription_group_id

  @active = true
  @assigned_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) }
  @changed = true
end

#on_rebalance_partitions_revoked(event) ⇒ Object

Callback that kicks in inside of rdkafka, when partitions were revoked.

Parameters:

  • event (Karafka::Core::Monitoring::Event)


88
89
90
91
92
93
94
95
96
97
# File 'lib/karafka/connection/rebalance_manager.rb', line 88

def on_rebalance_partitions_revoked(event)
  # Apply changes only for our subscription group
  return unless event[:subscription_group_id] == @subscription_group_id

  @active = true
  @revoked_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) }
  @changed = true

  remove_revoked_and_duplicated_messages
end