Module: Karafka::Pro::Routing::Features::AdaptiveIterator::Topic

Defined in:
lib/karafka/pro/routing/features/adaptive_iterator/topic.rb

Overview

Topic extension allowing us to enable and configure adaptive iterator

Instance Method Summary collapse

Instance Method Details

#adaptive_iterator(active: false, safety_margin: 10, marking_method: :mark_as_consumed, clean_after_yielding: true) ⇒ Object

Parameters:

  • active (Boolean) (defaults to: false)

    should we use the automatic adaptive iterator

  • safety_margin (Integer) (defaults to: 10)

    How big of a margin we leave ourselves so we can safely communicate back with Kafka, etc. We stop and seek back when we’ve burned 85% of the time by default. We leave 15% of time for post-processing operations so we have space before we hit max.poll.interval.ms.

  • marking_method (Symbol) (defaults to: :mark_as_consumed)

    If we should, how should we mark

  • clean_after_yielding (Boolean) (defaults to: true)

    Should we clean post-yielding via the cleaner API



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/karafka/pro/routing/features/adaptive_iterator/topic.rb', line 31

def adaptive_iterator(
  active: false,
  safety_margin: 10,
  marking_method: :mark_as_consumed,
  clean_after_yielding: true
)
  @adaptive_iterator ||= Config.new(
    active: active,
    safety_margin: safety_margin,
    marking_method: marking_method,
    clean_after_yielding: clean_after_yielding
  )
end

#adaptive_iterator?Boolean

Returns Is adaptive iterator active. It is always true, since we use it via explicit messages batch wrapper.

Returns:

  • (Boolean)

    Is adaptive iterator active. It is always true, since we use it via explicit messages batch wrapper



47
48
49
# File 'lib/karafka/pro/routing/features/adaptive_iterator/topic.rb', line 47

def adaptive_iterator?
  adaptive_iterator.active?
end

#initializeObject

This method calls the parent class initializer and then sets up the extra instance variable to nil. The explicit initialization to nil is included as an optimization for Ruby’s object shapes system, which improves memory layout and access performance.



17
18
19
20
# File 'lib/karafka/pro/routing/features/adaptive_iterator/topic.rb', line 17

def initialize(...)
  super
  @adaptive_iterator = nil
end

#to_hHash

Returns topic with all its native configuration options plus poll guarding setup configuration.

Returns:

  • (Hash)

    topic with all its native configuration options plus poll guarding setup configuration.



53
54
55
56
57
# File 'lib/karafka/pro/routing/features/adaptive_iterator/topic.rb', line 53

def to_h
  super.merge(
    adaptive_iterator: adaptive_iterator.to_h
  ).freeze
end