Module: Karafka::Pro::Routing::Features::ParallelSegments::Topic
- Defined in:
- lib/karafka/pro/routing/features/parallel_segments/topic.rb
Overview
Parallel segments related expansions to the topic building flow
Instance Method Summary collapse
-
#initialize(*args) ⇒ Object
Injects the parallel segments filter as the first filter during building of each of the topics in case parallel segments are enabled.
Instance Method Details
#initialize(*args) ⇒ Object
Injects the parallel segments filter as the first filter during building of each of the topics in case parallel segments are enabled.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/karafka/pro/routing/features/parallel_segments/topic.rb', line 17 def initialize(*args) super return unless consumer_group.parallel_segments? builder = lambda do |topic, _partition| mom = topic.manual_offset_management? # We have two filters for mom and non-mom scenario not to mix this logic filter_scope = Karafka::Pro::Processing::ParallelSegments::Filters filter_class = mom ? filter_scope::Mom : filter_scope::Default filter_class.new( segment_id: consumer_group.segment_id, partitioner: consumer_group.parallel_segments.partitioner, reducer: consumer_group.parallel_segments.reducer ) end filter(builder) end |