Module: Karafka::Pro::Routing::Features::PeriodicJob::Topic

Defined in:
lib/karafka/pro/routing/features/periodic_job/topic.rb

Overview

Periodic topic action flows extensions

Instance Method Summary collapse

Instance Method Details

#periodic_job(active = false, interval: nil, during_pause: nil, during_retry: nil) ⇒ Object Also known as: periodic

Defines topic as periodic. Periodic topics consumers will invoke #tick with each poll where messages were not received.

Parameters:

  • active (Boolean) (defaults to: false)

    should ticking happen for this topic assignments.

  • interval (Integer) (defaults to: nil)

    minimum interval to run periodic jobs on given topic.

  • during_pause (Boolean, nil) (defaults to: nil)

    Should periodic jobs run when partition is paused. It is set to nil by default allowing for detection when this value is not configured but should be built dynamically based on LRJ status.

  • during_retry (Boolean, nil) (defaults to: nil)

    Should we run when there was an error and we are in a retry flow. Please note that for this to work, during_pause also needs to be set to true as errors retry happens after pause.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/karafka/pro/routing/features/periodic_job/topic.rb', line 23

def periodic_job(
  active = false,
  interval: nil,
  during_pause: nil,
  during_retry: nil
)
  @periodic_job ||= begin
    # Set to active if any of the values was configured
    active = true unless interval.nil?
    active = true unless during_pause.nil?
    active = true unless during_retry.nil?
    # Default is not to retry during retry flow
    during_retry = false if during_retry.nil?

    # If no interval, use default
    interval ||= ::Karafka::App.config.internal.tick_interval

    Config.new(
      active: active,
      interval: interval,
      during_pause: during_pause,
      during_retry: during_retry,
      # This is internal setting for state management, not part of the configuration
      # Do not overwrite.
      # If `during_pause` is explicit, we do not select it based on LRJ setup and we
      # consider if fully ready out of the box
      materialized: !during_pause.nil?
    )
  end

  return @periodic_job if @periodic_job.materialized?
  return @periodic_job unless @long_running_job

  # If not configured in any way, we want not to process during pause for LRJ.
  # LRJ pauses by default when processing and during this time we do not want to
  # tick at all. This prevents us from running periodic jobs while LRJ jobs are
  # running. This of course has a side effect of not running when paused for any
  # other reason but it is a compromise in the default settings
  @periodic_job.during_pause = !long_running_job?
  @periodic_job.materialized = true

  @periodic_job
end

#periodic_job?Boolean

Returns is periodics active.

Returns:

  • (Boolean)

    is periodics active



70
71
72
# File 'lib/karafka/pro/routing/features/periodic_job/topic.rb', line 70

def periodic_job?
  periodic_job.active?
end

#to_hHash

Returns topic with all its native configuration options plus periodics flows settings.

Returns:

  • (Hash)

    topic with all its native configuration options plus periodics flows settings



76
77
78
79
80
# File 'lib/karafka/pro/routing/features/periodic_job/topic.rb', line 76

def to_h
  super.merge(
    periodic_job: periodic_job.to_h
  ).freeze
end