Class: Karafka::Instrumentation::AssignmentsTracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/instrumentation/assignments_tracker.rb

Overview

Keeps track of active assignments and materializes them by returning the routing topics with appropriate partitions that are assigned at a given moment

It is auto-subscribed as part of Karafka itself.

It is not heavy from the computational point of view, as it only operates during rebalances.

We keep assignments as flat topics structure because we can go from topics to both subscription and consumer groups if needed.

Instance Method Summary collapse

Constructor Details

#initializeAssignmentsTracker

Initializes the assignments tracker with empty assignments



18
19
20
21
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 18

def initialize
  @mutex = Mutex.new
  @assignments = Hash.new { |hash, key| hash[key] = [] }
end

Instance Method Details

#clearObject

Clears all the assignments



48
49
50
51
52
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 48

def clear
  @mutex.synchronize do
    @assignments.clear
  end
end

#currentHash{Karafka::Routing::Topic => Array<Integer>}

Note:

Keep in mind, that those assignments can change any time, especially when working with multiple consumer groups or subscription groups.

Note:

We return a copy because we modify internals and we do not want user to tamper with the data accidentally

Returns all the active/current assignments of this given process

Returns:



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 32

def current
  assignments = {}

  # Since the `@assignments` state can change during a rebalance, if we would iterate over
  # it exactly during state change, we would end up with the following error:
  #   RuntimeError: can't add a new key into hash during iteration
  @mutex.synchronize do
    @assignments.each do |topic, partitions|
      assignments[topic] = partitions.dup.freeze
    end
  end

  assignments.freeze
end

#inspectString

Returns thread-safe and lock-safe inspect implementation.

Returns:

  • (String)

    thread-safe and lock-safe inspect implementation



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 55

def inspect
  info = if @mutex.try_lock
           begin
             assignments = @assignments.dup.transform_keys(&:name).inspect
             "assignments=#{assignments}"
           ensure
             @mutex.unlock
           end
         else
           'busy'
         end

  "#<#{self.class.name} #{info}>"
end

#on_client_events_poll(event) ⇒ Object

Note:

We can run the #assignment_lost? on each events poll because they happen once every 5 seconds during processing plus prior to each messages poll. It takes 0.6 microseconds per call.

Handles events_poll notification to detect assignment loss This is called regularly (every tick_interval) so we check if assignment was lost

Parameters:

  • event (Karafka::Core::Monitoring::Event)


90
91
92
93
94
95
96
97
98
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 90

def on_client_events_poll(event)
  client = event[:caller]

  # Only clear assignments if they were actually lost
  return unless client.assignment_lost?

  # Cleaning happens the same way as with the consumer reset
  on_client_reset(event)
end

#on_client_reset(event) ⇒ Object

When client is under reset due to critical issues, remove all of its assignments as we will get a new set of assignments

Parameters:

  • event (Karafka::Core::Monitoring::Event)


73
74
75
76
77
78
79
80
81
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 73

def on_client_reset(event)
  sg = event[:subscription_group]

  @mutex.synchronize do
    @assignments.delete_if do |topic, _partitions|
      topic.subscription_group.id == sg.id
    end
  end
end

#on_rebalance_partitions_assigned(event) ⇒ Object

Adds partitions to the current assignments hash

Parameters:

  • event (Karafka::Core::Monitoring::Event)


121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 121

def on_rebalance_partitions_assigned(event)
  sg = event[:subscription_group]

  @mutex.synchronize do
    event[:tpl].to_h.each do |topic, partitions|
      topic = sg.topics.find(topic)

      @assignments[topic] += partitions.map(&:partition)
      @assignments[topic].sort!
    end
  end
end

#on_rebalance_partitions_revoked(event) ⇒ Object

Removes partitions from the current assignments hash

Parameters:

  • event (Karafka::Core::Monitoring::Event)


103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 103

def on_rebalance_partitions_revoked(event)
  sg = event[:subscription_group]

  @mutex.synchronize do
    event[:tpl].to_h.each do |topic, partitions|
      topic = sg.topics.find(topic)

      @assignments[topic] -= partitions.map(&:partition)
      @assignments[topic].sort!
      # Remove completely topics for which we do not have any assignments left
      @assignments.delete_if { |_topic, cur_partitions| cur_partitions.empty? }
    end
  end
end