Module: Karafka::Pro::Routing::Features::ParallelSegments::ConsumerGroup

Defined in:
lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb

Overview

Parallel segments are defined on the consumer group (since it creates many), thus we define them on the consumer group. This module adds extra methods needed there to make it work

Instance Method Summary collapse

Instance Method Details

#parallel_segmentsConfig

Returns parallel segments config.

Returns:

  • (Config)

    parallel segments config



16
17
18
19
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 16

def parallel_segments
  # We initialize it as disabled if not configured by the user
  public_send(:parallel_segments=, count: 1)
end

#parallel_segments=(count: 1, partitioner: nil, reducer: nil, merge_key: '-parallel-') ⇒ Object

Note:

This method is an assignor but the API is actually via the #parallel_segments method. Our Routing::Proxy normalizes that the way we want to have it exposed for the end users.

Allows setting parallel segments configuration

Parameters:

  • count (Integer) (defaults to: 1)

    number of parallel segments (number of parallel consumer groups that will be created)

  • partitioner (nil, #call) (defaults to: nil)

    nil or callable partitioner

  • reducer (nil, #call) (defaults to: nil)

    reducer for parallel key. It allows for using a custom reducer to achieve enhanced parallelization when the default reducer is not enough.

  • merge_key (String) (defaults to: '-parallel-')

    key used to build the parallel segment consumer groups



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 33

def parallel_segments=(
  count: 1,
  partitioner: nil,
  reducer: nil,
  merge_key: '-parallel-'
)
  @parallel_segments ||= Config.new(
    active: count > 1,
    count: count,
    partitioner: partitioner,
    reducer: reducer || ->(parallel_key) { parallel_key.to_s.sum % count },
    merge_key: merge_key
  )
end

#parallel_segments?Boolean

Returns are parallel segments active.

Returns:

  • (Boolean)

    are parallel segments active



49
50
51
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 49

def parallel_segments?
  parallel_segments.active?
end

#segment_idInteger

Returns id of the segment (0 or bigger) or -1 if parallel segments are not active.

Returns:

  • (Integer)

    id of the segment (0 or bigger) or -1 if parallel segments are not active



55
56
57
58
59
60
61
62
63
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 55

def segment_id
  return @segment_id if @segment_id

  @segment_id = if parallel_segments?
                  name.split(parallel_segments.merge_key).last.to_i
                else
                  -1
                end
end

#segment_originString

Returns original segment consumer group name.

Returns:

  • (String)

    original segment consumer group name



66
67
68
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 66

def segment_origin
  name.split(parallel_segments.merge_key).first
end

#to_hHash

Returns consumer group setup with the parallel segments definition in it.

Returns:

  • (Hash)

    consumer group setup with the parallel segments definition in it



71
72
73
74
75
76
77
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 71

def to_h
  super.merge(
    parallel_segments: parallel_segments.to_h.merge(
      segment_id: segment_id
    )
  ).freeze
end