Module: Karafka::Pro::Routing::Features::Pausing::Topic

Defined in:
lib/karafka/pro/routing/features/pausing/topic.rb

Overview

Expansion allowing for a per topic pause strategy definitions

Instance Method Summary collapse

Instance Method Details

#initializeObject

This method calls the parent class initializer and then sets up the extra instance variable to nil. The explicit initialization to nil is included as an optimization for Ruby’s object shapes system, which improves memory layout and access performance.



17
18
19
20
# File 'lib/karafka/pro/routing/features/pausing/topic.rb', line 17

def initialize(...)
  super
  @pausing = nil
end

#pause(timeout: nil, max_timeout: nil, with_exponential_backoff: nil) ⇒ Config

Allows for per-topic pausing strategy setting

Parameters:

  • timeout (Integer) (defaults to: nil)

    how long should we wait upon processing error (milliseconds)

  • max_timeout (Integer) (defaults to: nil)

    what is the max timeout in case of an exponential backoff (milliseconds)

  • with_exponential_backoff (Boolean) (defaults to: nil)

    should we use exponential backoff

Returns:

  • (Config)

    pausing config object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/karafka/pro/routing/features/pausing/topic.rb', line 29

def pause(timeout: nil, max_timeout: nil, with_exponential_backoff: nil)
  # If no arguments provided, just return or initialize the config
  return pausing if timeout.nil? && max_timeout.nil? && with_exponential_backoff.nil?

  # Update instance variables for backwards compatibility
  # This ensures code reading @pause_timeout directly or via the inherited getter
  # will get the correct values
  @pause_timeout = timeout if timeout
  @pause_max_timeout = max_timeout if max_timeout

  unless with_exponential_backoff.nil?
    @pause_with_exponential_backoff = with_exponential_backoff
  end

  # Create or update the config
  @pausing ||= Config.new(
    active: false,
    timeout: @pause_timeout || Karafka::App.config.pause.timeout,
    max_timeout: @pause_max_timeout || Karafka::App.config.pause.max_timeout,
    with_exponential_backoff: if @pause_with_exponential_backoff.nil?
                                Karafka::App.config.pause.with_exponential_backoff
                              else
                                @pause_with_exponential_backoff
                              end
  )

  @pausing.timeout = timeout if timeout
  @pausing.max_timeout = max_timeout if max_timeout

  unless with_exponential_backoff.nil?
    @pausing.with_exponential_backoff = with_exponential_backoff
  end

  @pausing.active = true

  @pausing
end

#pausingConfig

Returns pausing configuration object.

Returns:

  • (Config)

    pausing configuration object



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/karafka/pro/routing/features/pausing/topic.rb', line 68

def pausing
  @pausing ||= Config.new(
    active: false,
    timeout: @pause_timeout || Karafka::App.config.pause.timeout,
    max_timeout: @pause_max_timeout || Karafka::App.config.pause.max_timeout,
    with_exponential_backoff: if @pause_with_exponential_backoff.nil?
                                Karafka::App.config.pause.with_exponential_backoff
                              else
                                @pause_with_exponential_backoff
                              end
  )
end

#pausing?Boolean

Returns is pausing explicitly configured.

Returns:

  • (Boolean)

    is pausing explicitly configured



82
83
84
# File 'lib/karafka/pro/routing/features/pausing/topic.rb', line 82

def pausing?
  pausing.active?
end

#to_hHash

Returns topic with all its native configuration options plus pausing settings.

Returns:

  • (Hash)

    topic with all its native configuration options plus pausing settings



87
88
89
90
91
# File 'lib/karafka/pro/routing/features/pausing/topic.rb', line 87

def to_h
  super.merge(
    pausing: pausing.to_h
  ).freeze
end