Class: Karafka::Pro::Cli::ParallelSegments::Distribute
- Defined in:
- lib/karafka/pro/cli/parallel_segments/distribute.rb
Overview
This command does not remove the original consumer group from Kafka. We keep it just as a backup. User can remove it himself.
Kafka has no atomic operations this is why we first collect all the data and run needed validations before applying offsets.
Command that makes it easier for users to migrate from regular consumer groups to the parallel segments consumers groups by automatically distributing offsets based on the used “normal” consumer group.
Takes the segments origin consumer group offsets for a given set of topics and distributes those offsets onto the parallel segments consumer groups, so they can pick up where the origin group left.
To make sure users do not accidentally “re-distribute” their offsets from the original consumer group after the parallel consumer groups had offsets assigned and started to work, we check if the parallel groups have any offsets, if so unless forced we halt.
Instance Method Summary collapse
-
#call ⇒ Object
Runs the distribution process.
Methods inherited from Base
Methods included from Helpers::Colorize
Constructor Details
This class inherits a constructor from Karafka::Pro::Cli::ParallelSegments::Base
Instance Method Details
#call ⇒ Object
Runs the distribution process
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/karafka/pro/cli/parallel_segments/distribute.rb', line 29 def call puts 'Starting parallel segments distribution...' segments_count = applicable_groups.size if segments_count.zero? puts "#{red('No')} consumer groups with parallel segments configuration found" return end puts( "Found #{green(segments_count)} consumer groups with parallel segments configuration" ) distributions = [] applicable_groups.each do |segment_origin, segments| puts puts "Collecting group #{yellow(segment_origin)} details..." offsets = collect_offsets(segment_origin, segments) unless .key?(:force) puts "Validating group #{yellow(segment_origin)} parallel segments..." validate!(offsets, segments) end puts "Distributing group #{yellow(segment_origin)} offsets..." distributions += distribute(offsets, segments) end distributions.each do |distribution| apply(distribution) end puts puts "Distribution completed #{green('successfully')}!" end |