Module: Karafka::Pro::Routing::Features::ParallelSegments::Builder
- Defined in:
- lib/karafka/pro/routing/features/parallel_segments/builder.rb
Overview
Expansions for the routing builder
Instance Method Summary collapse
-
#consumer_group(group_id, &block) ⇒ Object
Builds and saves given consumer group.
Instance Method Details
#consumer_group(group_id, &block) ⇒ Object
Builds and saves given consumer group
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/karafka/pro/routing/features/parallel_segments/builder.rb', line 16 def consumer_group(group_id, &block) consumer_group = find { |cg| cg.name == group_id.to_s } # Re-opening a CG should not change its parallel setup if consumer_group super else # We build a temp consumer group and a target to check if it has parallel segments # enabled and if so, we do not add it to the routing but instead we build the # appropriate number of parallel segment groups temp_consumer_group = ::Karafka::Routing::ConsumerGroup.new(group_id.to_s) temp_target = Karafka::Routing::Proxy.new(temp_consumer_group, &block).target config = temp_target.parallel_segments if config.active? config.count.times do |i| sub_name = [group_id, config.merge_key, i.to_s].join sub_consumer_group = Karafka::Routing::ConsumerGroup.new(sub_name) self << Karafka::Routing::Proxy.new(sub_consumer_group, &block).target end # If not parallel segments are not active we go with the default flow else super end end end |