Module: Karafka::Pro::Routing::Features::ParallelSegments::Builder

Defined in:
lib/karafka/pro/routing/features/parallel_segments/builder.rb

Overview

Expansions for the routing builder

Instance Method Summary collapse

Instance Method Details

#consumer_group(group_id, &block) ⇒ Object

Builds and saves given consumer group

Parameters:

  • group_id (String, Symbol)

    name for consumer group

  • block (Proc)

    proc that should be executed in the proxy context



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/karafka/pro/routing/features/parallel_segments/builder.rb', line 16

def consumer_group(group_id, &block)
  consumer_group = find { |cg| cg.name == group_id.to_s }

  # Re-opening a CG should not change its parallel setup
  if consumer_group
    super
  else
    # We build a temp consumer group and a target to check if it has parallel segments
    # enabled and if so, we do not add it to the routing but instead we build the
    # appropriate number of parallel segment groups
    temp_consumer_group = ::Karafka::Routing::ConsumerGroup.new(group_id.to_s)
    temp_target = Karafka::Routing::Proxy.new(temp_consumer_group, &block).target
    config = temp_target.parallel_segments

    if config.active?
      config.count.times do |i|
        sub_name = [group_id, config.merge_key, i.to_s].join
        sub_consumer_group = Karafka::Routing::ConsumerGroup.new(sub_name)
        self << Karafka::Routing::Proxy.new(sub_consumer_group, &block).target
      end
    # If not parallel segments are not active we go with the default flow
    else
      super
    end
  end
end