Module: Karafka::Pro::Routing::Features::VirtualPartitions::Topic

Defined in:
lib/karafka/pro/routing/features/virtual_partitions/topic.rb

Overview

Topic extensions to be able to manage virtual partitions feature

Instance Method Summary collapse

Instance Method Details

#initializeObject

This method calls the parent class initializer and then sets up the extra instance variable to nil. The explicit initialization to nil is included as an optimization for Ruby’s object shapes system, which improves memory layout and access performance.



17
18
19
20
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 17

def initialize(...)
  super
  @virtual_partitions = nil
end

#to_hHash

Returns topic with all its native configuration options plus manual offset management namespace settings.

Returns:

  • (Hash)

    topic with all its native configuration options plus manual offset management namespace settings



64
65
66
67
68
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 64

def to_h
  super.merge(
    virtual_partitions: virtual_partitions.to_h
  ).freeze
end

#virtual_partitions(max_partitions: Karafka::App.config.concurrency, partitioner: nil, offset_metadata_strategy: :current, reducer: nil, distribution: :consistent) ⇒ VirtualPartitions

Returns method that allows to set the virtual partitions details during the routing configuration and then allows to retrieve it.

Parameters:

  • max_partitions (Integer) (defaults to: Karafka::App.config.concurrency)

    max number of virtual partitions that can come out of the single distribution flow. When set to more than the Karafka threading, will create more work than workers. When less, can ensure we have spare resources to process other things in parallel.

  • partitioner (nil, #call) (defaults to: nil)

    nil or callable partitioner

  • offset_metadata_strategy (Symbol) (defaults to: :current)

    how we should match the metadata for the offset. :exact will match the offset matching metadata and :current will select the most recently reported metadata

  • reducer (nil, #call) (defaults to: nil)

    reducer for VPs key. It allows for using a custom reducer to achieve enhanced parallelization when the default reducer is not enough.

  • distribution (Symbol) (defaults to: :consistent)

    the strategy to use for virtual partitioning. Can be either :consistent or :balanced. The :balanced strategy ensures balanced distribution of work across available workers while maintaining message order within groups.

Returns:

  • (VirtualPartitions)

    method that allows to set the virtual partitions details during the routing configuration and then allows to retrieve it



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 38

def virtual_partitions(
  max_partitions: Karafka::App.config.concurrency,
  partitioner: nil,
  offset_metadata_strategy: :current,
  reducer: nil,
  distribution: :consistent
)
  @virtual_partitions ||= Config.new(
    active: !partitioner.nil?,
    max_partitions: max_partitions,
    partitioner: partitioner,
    offset_metadata_strategy: ,
    # If no reducer provided, we use this one. It just runs a modulo on the sum of
    # a stringified version, providing fairly good distribution.
    reducer: reducer || ->(virtual_key) { virtual_key.to_s.sum % max_partitions },
    distribution: distribution
  )
end

#virtual_partitions?Boolean

Returns are virtual partitions enabled for given topic.

Returns:

  • (Boolean)

    are virtual partitions enabled for given topic



58
59
60
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 58

def virtual_partitions?
  virtual_partitions.active?
end