Class: Karafka::Instrumentation::AssignmentsTracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/instrumentation/assignments_tracker.rb

Overview

Keeps track of active assignments and materializes them by returning the routing topics with appropriate partitions that are assigned at a given moment

It is auto-subscribed as part of Karafka itself.

It is not heavy from the computational point of view, as it only operates during rebalances.

We keep assignments as flat topics structure because we can go from topics to both subscription and consumer groups if needed.

Instance Method Summary collapse

Constructor Details

#initializeAssignmentsTracker

Returns a new instance of AssignmentsTracker.



17
18
19
20
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 17

def initialize
  @mutex = Mutex.new
  @assignments = Hash.new { |hash, key| hash[key] = [] }
end

Instance Method Details

#clearObject

Clears all the assignments



47
48
49
50
51
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 47

def clear
  @mutex.synchronize do
    @assignments.clear
  end
end

#currentHash<Karafka::Routing::Topic, Array<Integer>>

Note:

Keep in mind, that those assignments can change any time, especially when working with multiple consumer groups or subscription groups.

Note:

We return a copy because we modify internals and we do not want user to tamper with the data accidentally

Returns all the active/current assignments of this given process

Returns:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 31

def current
  assignments = {}

  # Since the `@assignments` state can change during a rebalance, if we would iterate over
  # it exactly during state change, we would end up with the following error:
  #   RuntimeError: can't add a new key into hash during iteration
  @mutex.synchronize do
    @assignments.each do |topic, partitions|
      assignments[topic] = partitions.dup.freeze
    end
  end

  assignments.freeze
end

#on_client_reset(event) ⇒ Object

When client is under reset due to critical issues, remove all of its assignments as we will get a new set of assignments

Parameters:

  • event (Karafka::Core::Monitoring::Event)


56
57
58
59
60
61
62
63
64
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 56

def on_client_reset(event)
  sg = event[:subscription_group]

  @mutex.synchronize do
    @assignments.delete_if do |topic, _partitions|
      topic.subscription_group.id == sg.id
    end
  end
end

#on_rebalance_partitions_assigned(event) ⇒ Object

Adds partitions to the current assignments hash

Parameters:

  • event (Karafka::Core::Monitoring::Event)


87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 87

def on_rebalance_partitions_assigned(event)
  sg = event[:subscription_group]

  @mutex.synchronize do
    event[:tpl].to_h.each do |topic, partitions|
      topic = sg.topics.find(topic)

      @assignments[topic] += partitions.map(&:partition)
      @assignments[topic].sort!
    end
  end
end

#on_rebalance_partitions_revoked(event) ⇒ Object

Removes partitions from the current assignments hash

Parameters:

  • event (Karafka::Core::Monitoring::Event)


69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 69

def on_rebalance_partitions_revoked(event)
  sg = event[:subscription_group]

  @mutex.synchronize do
    event[:tpl].to_h.each do |topic, partitions|
      topic = sg.topics.find(topic)

      @assignments[topic] -= partitions.map(&:partition)
      @assignments[topic].sort!
      # Remove completely topics for which we do not have any assignments left
      @assignments.delete_if { |_topic, cur_partitions| cur_partitions.empty? }
    end
  end
end