Class: Karafka::Pro::Processing::VirtualPartitions::Distributors::Balanced
- Inherits:
-
Base
- Object
- Base
- Karafka::Pro::Processing::VirtualPartitions::Distributors::Balanced
- Defined in:
- lib/karafka/pro/processing/virtual_partitions/distributors/balanced.rb
Overview
Balanced distributor that groups messages by partition key and processes larger groups first while maintaining message order within groups
Instance Method Summary collapse
-
#call(messages) ⇒ Hash{Integer => Array<Karafka::Messages::Message>}
Distributes messages to virtual partitions ensuring balanced load across workers by grouping messages by partition key and assigning larger groups first.
Methods inherited from Base
Constructor Details
This class inherits a constructor from Karafka::Pro::Processing::VirtualPartitions::Distributors::Base
Instance Method Details
#call(messages) ⇒ Hash{Integer => Array<Karafka::Messages::Message>}
Distributes messages to virtual partitions ensuring balanced load across workers by grouping messages by partition key and assigning larger groups first
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/karafka/pro/processing/virtual_partitions/distributors/balanced.rb', line 19 def call() # Group messages by partition key key_groupings = .group_by { |msg| config.partitioner.call(msg) } worker_loads = Array.new(config.max_partitions, 0) worker_assignments = Array.new(config.max_partitions) { [] } # Sort keys by workload in descending order sorted_keys = key_groupings.keys.sort_by { |key| -key_groupings[key].size } # Assign each key to the worker with the least current load sorted_keys.each do |key| # Find worker with minimum current load min_load_worker = worker_loads.each_with_index.min_by { |load, _| load }[1] = key_groupings[key] # Assign this key to that worker worker_assignments[min_load_worker] += worker_loads[min_load_worker] += .size end # Combine messages for each worker and sort by offset worker_assignments .each_with_index .reject { |, _| .empty? } .map! { |, index| [index, .sort_by!(&:offset)] } .to_h end |