Class: Karafka::Pro::Processing::VirtualPartitions::Distributors::Balanced

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/pro/processing/virtual_partitions/distributors/balanced.rb

Overview

Balanced distributor that groups messages by partition key and processes larger groups first while maintaining message order within groups

Instance Method Summary collapse

Methods inherited from Base

#initialize

Constructor Details

This class inherits a constructor from Karafka::Pro::Processing::VirtualPartitions::Distributors::Base

Instance Method Details

#call(messages) ⇒ Hash<Integer, Array<Karafka::Messages::Message>>

Returns hash with group ids as keys and message groups as values.

Parameters:

Returns:



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/karafka/pro/processing/virtual_partitions/distributors/balanced.rb', line 17

def call(messages)
  # Group messages by partition key
  key_groupings = messages.group_by { |msg| config.partitioner.call(msg) }

  worker_loads = Array.new(config.max_partitions, 0)
  worker_assignments = Array.new(config.max_partitions) { [] }

  # Sort keys by workload in descending order
  sorted_keys = key_groupings.keys.sort_by { |key| -key_groupings[key].size }

  # Assign each key to the worker with the least current load
  sorted_keys.each do |key|
    # Find worker with minimum current load
    min_load_worker = worker_loads.each_with_index.min_by { |load, _| load }[1]
    messages = key_groupings[key]

    # Assign this key to that worker
    worker_assignments[min_load_worker] += messages
    worker_loads[min_load_worker] += messages.size
  end

  # Combine messages for each worker and sort by offset
  worker_assignments
    .each_with_index
    .reject { |group_messages, _| group_messages.empty? }
    .map! { |group_messages, index| [index, group_messages.sort_by!(&:offset)] }
    .to_h
end