Module: Karafka::Pro::Routing::Features::VirtualPartitions::Topic

Defined in:
lib/karafka/pro/routing/features/virtual_partitions/topic.rb

Overview

Topic extensions to be able to manage virtual partitions feature

Instance Method Summary collapse

Instance Method Details

#to_hHash

Returns topic with all its native configuration options plus manual offset management namespace settings.

Returns:

  • (Hash)

    topic with all its native configuration options plus manual offset management namespace settings



55
56
57
58
59
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 55

def to_h
  super.merge(
    virtual_partitions: virtual_partitions.to_h
  ).freeze
end

#virtual_partitions(max_partitions: Karafka::App.config.concurrency, partitioner: nil, offset_metadata_strategy: :current, reducer: nil, distribution: :consistent) ⇒ VirtualPartitions

Returns method that allows to set the virtual partitions details during the routing configuration and then allows to retrieve it.

Parameters:

  • max_partitions (Integer) (defaults to: Karafka::App.config.concurrency)

    max number of virtual partitions that can come out of the single distribution flow. When set to more than the Karafka threading, will create more work than workers. When less, can ensure we have spare resources to process other things in parallel.

  • partitioner (nil, #call) (defaults to: nil)

    nil or callable partitioner

  • offset_metadata_strategy (Symbol) (defaults to: :current)

    how we should match the metadata for the offset. :exact will match the offset matching metadata and :current will select the most recently reported metadata

  • reducer (nil, #call) (defaults to: nil)

    reducer for VPs key. It allows for using a custom reducer to achieve enhanced parallelization when the default reducer is not enough.

  • distribution (Symbol) (defaults to: :consistent)

    the strategy to use for virtual partitioning. Can be either :consistent or :balanced. The :balanced strategy ensures balanced distribution of work across available workers while maintaining message order within groups.

Returns:

  • (VirtualPartitions)

    method that allows to set the virtual partitions details during the routing configuration and then allows to retrieve it



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 29

def virtual_partitions(
  max_partitions: Karafka::App.config.concurrency,
  partitioner: nil,
  offset_metadata_strategy: :current,
  reducer: nil,
  distribution: :consistent
)
  @virtual_partitions ||= Config.new(
    active: !partitioner.nil?,
    max_partitions: max_partitions,
    partitioner: partitioner,
    offset_metadata_strategy: ,
    # If no reducer provided, we use this one. It just runs a modulo on the sum of
    # a stringified version, providing fairly good distribution.
    reducer: reducer || ->(virtual_key) { virtual_key.to_s.sum % max_partitions },
    distribution: distribution
  )
end

#virtual_partitions?Boolean

Returns are virtual partitions enabled for given topic.

Returns:

  • (Boolean)

    are virtual partitions enabled for given topic



49
50
51
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 49

def virtual_partitions?
  virtual_partitions.active?
end