Class: Karafka::Pro::Processing::VirtualPartitions::Distributors::Balanced
- Inherits:
-
Base
- Object
- Base
- Karafka::Pro::Processing::VirtualPartitions::Distributors::Balanced
- Defined in:
- lib/karafka/pro/processing/virtual_partitions/distributors/balanced.rb
Overview
Balanced distributor that groups messages by partition key and processes larger groups first while maintaining message order within groups
Instance Method Summary collapse
-
#call(messages) ⇒ Hash<Integer, Array<Karafka::Messages::Message>>
Hash with group ids as keys and message groups as values.
Methods inherited from Base
Constructor Details
This class inherits a constructor from Karafka::Pro::Processing::VirtualPartitions::Distributors::Base
Instance Method Details
#call(messages) ⇒ Hash<Integer, Array<Karafka::Messages::Message>>
Returns hash with group ids as keys and message groups as values.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/karafka/pro/processing/virtual_partitions/distributors/balanced.rb', line 17 def call() # Group messages by partition key key_groupings = .group_by { |msg| config.partitioner.call(msg) } worker_loads = Array.new(config.max_partitions, 0) worker_assignments = Array.new(config.max_partitions) { [] } # Sort keys by workload in descending order sorted_keys = key_groupings.keys.sort_by { |key| -key_groupings[key].size } # Assign each key to the worker with the least current load sorted_keys.each do |key| # Find worker with minimum current load min_load_worker = worker_loads.each_with_index.min_by { |load, _| load }[1] = key_groupings[key] # Assign this key to that worker worker_assignments[min_load_worker] += worker_loads[min_load_worker] += .size end # Combine messages for each worker and sort by offset worker_assignments .each_with_index .reject { |, _| .empty? } .map! { |, index| [index, .sort_by!(&:offset)] } .to_h end |