Class: Karafka::Pro::Connection::Manager
- Inherits:
-
Connection::Manager
- Object
- Connection::Manager
- Karafka::Pro::Connection::Manager
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/pro/connection/manager.rb
Overview
Manager operations relate to consumer groups and not subscription groups. Since cluster operations can cause consumer group wide effects, we always apply only one change on a consumer group.
Manager that can handle working with multiplexed connections.
This manager takes into consideration the number of partitions assigned to the topics and does its best to balance. Additional connections may not always be utilized because alongside of them, other processes may “hijack” the assignment. In such cases those extra empty connections will be turned off after a while.
Instance Method Summary collapse
-
#control ⇒ Object
Shuts down all the listeners when it is time (including moving to quiet) or rescales when it is needed.
-
#initialize(scale_delay = 60 * 1_000) ⇒ Manager
constructor
How long should we wait after a rebalance before doing anything on a consumer group.
-
#notice(subscription_group_id, statistics) ⇒ Object
Collects data from the statistics about given subscription group.
-
#register(listeners) ⇒ Object
Registers listeners and starts the scaling procedures.
Methods inherited from Connection::Manager
Constructor Details
#initialize(scale_delay = 60 * 1_000) ⇒ Manager
How long should we wait after a rebalance before doing anything on a consumer group
35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/karafka/pro/connection/manager.rb', line 35 def initialize(scale_delay = 60 * 1_000) super() @scale_delay = scale_delay @mutex = Mutex.new @changes = Hash.new do |h, k| h[k] = { state: '', join_state: '', state_age: 0, changed_at: monotonic_now } end end |
Instance Method Details
#control ⇒ Object
Shuts down all the listeners when it is time (including moving to quiet) or rescales when it is needed
102 103 104 |
# File 'lib/karafka/pro/connection/manager.rb', line 102 def control Karafka::App.done? ? shutdown : rescale end |
#notice(subscription_group_id, statistics) ⇒ Object
Please note that while we collect here per subscription group, we use those metrics collectively on a whole consumer group. This reduces the friction.
Collects data from the statistics about given subscription group. This is used to ensure that we do not rescale short after rebalances, deployments, etc.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/karafka/pro/connection/manager.rb', line 81 def notice(subscription_group_id, statistics) times = [] # stateage is in microseconds # We monitor broker changes to make sure we do not introduce extra friction times << statistics['brokers'].values.map { |stats| stats['stateage'] }.min / 1_000 times << statistics['cgrp']['rebalance_age'] times << statistics['cgrp']['stateage'] # Keep the previous change age for changes that were triggered by us previous_changed_at = @changes[subscription_group_id][:changed_at] @changes[subscription_group_id].merge!( state_age: times.min, changed_at: previous_changed_at, join_state: statistics['cgrp']['join_state'], state: statistics['cgrp']['state'] ) end |
#register(listeners) ⇒ Object
Registers listeners and starts the scaling procedures
When using dynamic multiplexing, it will start the absolute minimum of connections for subscription group available.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/karafka/pro/connection/manager.rb', line 55 def register(listeners) @listeners = listeners # Preload all the keys into the hash so we never add keys to changes but just change them listeners.each { |listener| @changes[listener.subscription_group.id] } in_sg_families do |first_subscription_group, sg_listeners| multiplexing = first_subscription_group.multiplexing if multiplexing.active? && multiplexing.dynamic? # Start as many boot listeners as user wants. If not configured, starts half of max. sg_listeners.first(multiplexing.boot).each(&:start!) else sg_listeners.each(&:start!) end end end |