Module: Karafka::Pro::Routing::Features::ParallelSegments::ConsumerGroup
- Defined in:
- lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb
Overview
Parallel segments are defined on the consumer group (since it creates many), thus we define them on the consumer group. This module adds extra methods needed there to make it work
Instance Method Summary collapse
-
#parallel_segments ⇒ Config
Parallel segments config.
-
#parallel_segments=(count: 1, partitioner: nil, reducer: nil, merge_key: '-parallel-') ⇒ Object
Allows setting parallel segments configuration.
-
#parallel_segments? ⇒ Boolean
Are parallel segments active.
-
#segment_id ⇒ Integer
Id of the segment (0 or bigger) or -1 if parallel segments are not active.
-
#segment_origin ⇒ String
Original segment consumer group name.
-
#to_h ⇒ Hash
Consumer group setup with the parallel segments definition in it.
Instance Method Details
#parallel_segments ⇒ Config
Returns parallel segments config.
16 17 18 19 |
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 16 def parallel_segments # We initialize it as disabled if not configured by the user public_send(:parallel_segments=, count: 1) end |
#parallel_segments=(count: 1, partitioner: nil, reducer: nil, merge_key: '-parallel-') ⇒ Object
This method is an assignor but the API is actually via the #parallel_segments
method. Our Routing::Proxy
normalizes that the way we want to have it exposed for the end users.
Allows setting parallel segments configuration
33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 33 def parallel_segments=( count: 1, partitioner: nil, reducer: nil, merge_key: '-parallel-' ) @parallel_segments ||= Config.new( active: count > 1, count: count, partitioner: partitioner, reducer: reducer || ->(parallel_key) { parallel_key.to_s.sum % count }, merge_key: merge_key ) end |
#parallel_segments? ⇒ Boolean
Returns are parallel segments active.
49 50 51 |
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 49 def parallel_segments? parallel_segments.active? end |
#segment_id ⇒ Integer
Returns id of the segment (0 or bigger) or -1 if parallel segments are not active.
55 56 57 58 59 60 61 62 63 |
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 55 def segment_id return @segment_id if @segment_id @segment_id = if parallel_segments? name.split(parallel_segments.merge_key).last.to_i else -1 end end |
#segment_origin ⇒ String
Returns original segment consumer group name.
66 67 68 |
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 66 def segment_origin name.split(parallel_segments.merge_key).first end |
#to_h ⇒ Hash
Returns consumer group setup with the parallel segments definition in it.
71 72 73 74 75 76 77 |
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 71 def to_h super.merge( parallel_segments: parallel_segments.to_h.merge( segment_id: segment_id ) ).freeze end |