Class: Karafka::Pro::Cli::ParallelSegments::Distribute

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/pro/cli/parallel_segments/distribute.rb

Overview

Note:

This command does not remove the original consumer group from Kafka. We keep it just as a backup. User can remove it himself.

Note:

Kafka has no atomic operations this is why we first collect all the data and run needed validations before applying offsets.

Command that makes it easier for users to migrate from regular consumer groups to the parallel segments consumers groups by automatically distributing offsets based on the used “normal” consumer group.

Takes the segments origin consumer group offsets for a given set of topics and distributes those offsets onto the parallel segments consumer groups, so they can pick up where the origin group left.

To make sure users do not accidentally “re-distribute” their offsets from the original consumer group after the parallel consumer groups had offsets assigned and started to work, we check if the parallel groups have any offsets, if so unless forced we halt.

Instance Method Summary collapse

Methods inherited from Base

#initialize

Methods included from Helpers::Colorize

#green, #grey, #red, #yellow

Constructor Details

This class inherits a constructor from Karafka::Pro::Cli::ParallelSegments::Base

Instance Method Details

#callObject

Runs the distribution process



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/karafka/pro/cli/parallel_segments/distribute.rb', line 29

def call
  puts 'Starting parallel segments distribution...'

  segments_count = applicable_groups.size

  if segments_count.zero?
    puts "#{red('No')} consumer groups with parallel segments configuration found"

    return
  end

  puts(
    "Found #{green(segments_count)} consumer groups with parallel segments configuration"
  )

  distributions = []

  applicable_groups.each do |segment_origin, segments|
    puts
    puts "Collecting group #{yellow(segment_origin)} details..."
    offsets = collect_offsets(segment_origin, segments)

    unless options.key?(:force)
      puts "Validating group #{yellow(segment_origin)} parallel segments..."
      validate!(offsets, segments)
    end

    puts "Distributing group #{yellow(segment_origin)} offsets..."
    distributions += distribute(offsets, segments)
  end

  distributions.each do |distribution|
    apply(distribution)
  end

  puts
  puts "Distribution completed #{green('successfully')}!"
end