Module: Karafka::Pro::Routing::Features::ParallelSegments::Topic

Defined in:
lib/karafka/pro/routing/features/parallel_segments/topic.rb

Overview

Parallel segments related expansions to the topic building flow

Instance Method Summary collapse

Instance Method Details

#initialize(*args) ⇒ Object

Injects the parallel segments filter as the first filter during building of each of the topics in case parallel segments are enabled.

Parameters:

  • args (Object)

    anything accepted by the topic initializer



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/karafka/pro/routing/features/parallel_segments/topic.rb', line 17

def initialize(*args)
  super

  return unless consumer_group.parallel_segments?

  builder = lambda do |topic, _partition|
    mom = topic.manual_offset_management?

    # We have two filters for mom and non-mom scenario not to mix this logic
    filter_scope = Karafka::Pro::Processing::ParallelSegments::Filters
    filter_class = mom ? filter_scope::Mom : filter_scope::Default

    filter_class.new(
      segment_id: consumer_group.segment_id,
      partitioner: consumer_group.parallel_segments.partitioner,
      reducer: consumer_group.parallel_segments.reducer
    )
  end

  filter(builder)
end