Class: Karafka::Pro::Connection::Manager

Inherits:
Connection::Manager show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/pro/connection/manager.rb

Overview

Note:

Manager operations relate to consumer groups and not subscription groups. Since cluster operations can cause consumer group wide effects, we always apply only one change on a consumer group.

Manager that can handle working with multiplexed connections.

This manager takes into consideration the number of partitions assigned to the topics and does its best to balance. Additional connections may not always be utilized because alongside of them, other processes may “hijack” the assignment. In such cases those extra empty connections will be turned off after a while.

Instance Method Summary collapse

Methods inherited from Connection::Manager

#done?

Constructor Details

#initialize(scale_delay = 60 * 1_000) ⇒ Manager

How long should we wait after a rebalance before doing anything on a consumer group

Parameters:

  • scale_delay (Integer) (defaults to: 60 * 1_000)

    How long should we wait before making any changes. Any change related to this consumer group will postpone the scaling operations. This is done that way to prevent too many friction in the cluster. It is 1 minute by default



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/karafka/pro/connection/manager.rb', line 27

def initialize(scale_delay = 60 * 1_000)
  super()
  @scale_delay = scale_delay
  @mutex = Mutex.new
  @changes = Hash.new do |h, k|
    h[k] = {
      state: '',
      join_state: '',
      state_age: 0,
      changed_at: monotonic_now
    }
  end
end

Instance Method Details

#controlObject

Shuts down all the listeners when it is time (including moving to quiet) or rescales when it is needed



94
95
96
# File 'lib/karafka/pro/connection/manager.rb', line 94

def control
  Karafka::App.done? ? shutdown : rescale
end

#notice(subscription_group_id, statistics) ⇒ Object

Note:

Please note that while we collect here per subscription group, we use those metrics collectively on a whole consumer group. This reduces the friction.

Collects data from the statistics about given subscription group. This is used to ensure that we do not rescale short after rebalances, deployments, etc.

Parameters:

  • subscription_group_id (String)

    id of the subscription group for which statistics were emitted

  • statistics (Hash)

    emitted statistics



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/karafka/pro/connection/manager.rb', line 73

def notice(subscription_group_id, statistics)
  times = []
  # stateage is in microseconds
  # We monitor broker changes to make sure we do not introduce extra friction
  times << statistics['brokers'].values.map { |stats| stats['stateage'] }.min / 1_000
  times << statistics['cgrp']['rebalance_age']
  times << statistics['cgrp']['stateage']

  # Keep the previous change age for changes that were triggered by us
  previous_changed_at = @changes[subscription_group_id][:changed_at]

  @changes[subscription_group_id].merge!(
    state_age: times.min,
    changed_at: previous_changed_at,
    join_state: statistics['cgrp']['join_state'],
    state: statistics['cgrp']['state']
  )
end

#register(listeners) ⇒ Object

Registers listeners and starts the scaling procedures

When using dynamic multiplexing, it will start the absolute minimum of connections for subscription group available.

Parameters:



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/karafka/pro/connection/manager.rb', line 47

def register(listeners)
  @listeners = listeners

  # Preload all the keys into the hash so we never add keys to changes but just change them
  listeners.each { |listener| @changes[listener.subscription_group.id] }

  in_sg_families do |first_subscription_group, sg_listeners|
    multiplexing = first_subscription_group.multiplexing

    if multiplexing.active? && multiplexing.dynamic?
      # Start as many boot listeners as user wants. If not configured, starts half of max.
      sg_listeners.first(multiplexing.boot).each(&:start!)
    else
      sg_listeners.each(&:start!)
    end
  end
end