Module: Karafka::Pro::Routing::Features::VirtualPartitions::Topic

Defined in:
lib/karafka/pro/routing/features/virtual_partitions/topic.rb

Overview

Topic extensions to be able to manage virtual partitions feature

Instance Method Summary collapse

Instance Method Details

#to_hHash

Returns topic with all its native configuration options plus manual offset management namespace settings.

Returns:

  • (Hash)

    topic with all its native configuration options plus manual offset management namespace settings



49
50
51
52
53
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 49

def to_h
  super.merge(
    virtual_partitions: virtual_partitions.to_h
  ).freeze
end

#virtual_partitions(max_partitions: Karafka::App.config.concurrency, partitioner: nil, offset_metadata_strategy: :current, reducer: nil) ⇒ VirtualPartitions

Returns method that allows to set the virtual partitions details during the routing configuration and then allows to retrieve it.

Parameters:

  • max_partitions (Integer) (defaults to: Karafka::App.config.concurrency)

    max number of virtual partitions that can come out of the single distribution flow. When set to more than the Karafka threading, will create more work than workers. When less, can ensure we have spare resources to process other things in parallel.

  • partitioner (nil, #call) (defaults to: nil)

    nil or callable partitioner

  • offset_metadata_strategy (Symbol) (defaults to: :current)

    how we should match the metadata for the offset. :exact will match the offset matching metadata and :current will select the most recently reported metadata

  • reducer (nil, #call) (defaults to: nil)

    reducer for VPs key. It allows for using a custom reducer to achieve enhanced parallelization when the default reducer is not enough.

Returns:

  • (VirtualPartitions)

    method that allows to set the virtual partitions details during the routing configuration and then allows to retrieve it



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 25

def virtual_partitions(
  max_partitions: Karafka::App.config.concurrency,
  partitioner: nil,
  offset_metadata_strategy: :current,
  reducer: nil
)
  @virtual_partitions ||= Config.new(
    active: !partitioner.nil?,
    max_partitions: max_partitions,
    partitioner: partitioner,
    offset_metadata_strategy: ,
    # If no reducer provided, we use this one. It just runs a modulo on the sum of
    # a stringified version, providing fairly good distribution.
    reducer: reducer || ->(virtual_key) { virtual_key.to_s.sum % max_partitions }
  )
end

#virtual_partitions?Boolean

Returns are virtual partitions enabled for given topic.

Returns:

  • (Boolean)

    are virtual partitions enabled for given topic



43
44
45
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 43

def virtual_partitions?
  virtual_partitions.active?
end