Class: Karafka::Pro::Cli::ParallelSegments::Collapse
- Defined in:
- lib/karafka/pro/cli/parallel_segments/collapse.rb
Overview
Note:
Running this can cause you some double processing if the parallel segments final offsets are not aligned.
Note:
This will not remove the parallel segments consumer groups. Please use the Admin API if you want them to be removed.
Takes the committed offset of each parallel segment for each topic and records them back onto the segment origin consumer group. Without --force
it will raise an error on conflicts. With --force
it will take the lowest possible offset for each topic partition as the baseline.
Instance Method Summary collapse
-
#call ⇒ Object
Runs the collapse operation.
Methods inherited from Base
Methods included from Helpers::Colorize
Constructor Details
This class inherits a constructor from Karafka::Pro::Cli::ParallelSegments::Base
Instance Method Details
#call ⇒ Object
Runs the collapse operation
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/karafka/pro/cli/parallel_segments/collapse.rb', line 22 def call puts 'Starting parallel segments collapse...' segments_count = applicable_groups.size if segments_count.zero? puts "#{red('No')} consumer groups with parallel segments configuration found" return end puts( "Found #{green(segments_count)} consumer groups with parallel segments configuration" ) collapses = [] applicable_groups.each do |segment_origin, segments| puts puts "Collecting group #{yellow(segment_origin)} details..." offsets = collect_offsets(segment_origin, segments) unless .key?(:force) puts puts "Validating offsets positions for #{yellow(segment_origin)} consumer group..." validate!(offsets, segment_origin) end puts puts "Computing collapsed offsets for #{yellow(segment_origin)} consumer group..." collapses << collapse(offsets, segments) end collapses.each do |collapse| apply(collapse) end puts puts "Collapse completed #{green('successfully')}!" end |