Module: Karafka::Pro::Routing::Features::RecurringTasks::Builder

Included in:
Proxy
Defined in:
lib/karafka/pro/routing/features/recurring_tasks/builder.rb

Overview

Routing extensions for recurring tasks

Instance Method Summary collapse

Instance Method Details

#ensure_fugit_availability!Object

Checks if fugit is present. If not, will try to require it as it might not have been required but is available. If fails, will crash.



105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/karafka/pro/routing/features/recurring_tasks/builder.rb', line 105

def ensure_fugit_availability!
  return if Object.const_defined?(:Fugit)

  require 'fugit'
rescue LoadError
  raise(
    ::Karafka::Errors::DependencyConstraintsError,
    <<~ERROR_MSG
      Failed to require fugit gem.
      Add it to your Gemfile, as it is required for the recurring tasks to work.
    ERROR_MSG
  )
end

#recurring_tasks(active = false, &block) ⇒ Object

Enabled recurring tasks operations and adds needed topics and other stuff.

Parameters:

  • active (Boolean) (defaults to: false)

    should recurring tasks be active. We use a boolean flag to have API consistency in the system, so it matches other routing related APIs.

  • block (Proc)

    optional reconfiguration of the topics definitions.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/karafka/pro/routing/features/recurring_tasks/builder.rb', line 18

def recurring_tasks(active = false, &block)
  return unless active

  # We only require zlib when we decide to run recurring tasks because it is not needed
  # otherwise.
  require 'zlib'
  ensure_fugit_availability!

  tasks_cfg = App.config.recurring_tasks
  topics_cfg = tasks_cfg.topics

  consumer_group tasks_cfg.group_id do
    # Registers the primary topic that we use to control schedules execution. This is
    # the one that we use to trigger recurring tasks.
    schedules_topic = topic(topics_cfg.schedules) do
      consumer tasks_cfg.consumer_class
      deserializer tasks_cfg.deserializer
      # Because the topic method name as well as builder proxy method name is the same
      # we need to reference it via target directly
      target.recurring_tasks(true)

      # We manage offsets directly because messages can have both schedules and
      # commands and we need to apply them only when we need to
      manual_offset_management(true)

      # We use multi-batch operations and in-memory state for schedules. This needs to
      # always operate without re-creation.
      consumer_persistence(true)

      # This needs to be enabled for the eof to work correctly
      kafka('enable.partition.eof': true, inherit: true)
      eofed(true)

      # Favour latency. This is a low traffic topic that only accepts user initiated
      # low-frequency commands
      max_messages(1)
      # Since we always react on the received message, we can block for longer periods
      # of time
      max_wait_time(10_000)

      # Since the execution of particular tasks is isolated and guarded, it should not
      # leak. This means, that this is to handle errors like schedule version
      # incompatibility and other errors that will not go away without a redeployment
      pause_timeout(60 * 1_000)
      pause_max_timeout(60 * 1_000)

      # Keep older data for a day and compact to the last state available
      config(
        'cleanup.policy': 'compact,delete',
        'retention.ms': 86_400_000
      )

      # This is the core of execution. Since we're producers of states, we need a way
      # to tick without having new data
      periodic(
        interval: App.config.recurring_tasks.interval,
        during_pause: false,
        during_retry: false
      )

      # If this is the direct schedules redefinition style, we run it
      # The second one (see end of this method) allows for linear reconfiguration of
      # both the topics
      instance_eval(&block) if block && block.arity.zero?
    end

    # This topic is to store logs that we can then inspect either from the admin or via
    # the Web UI
    logs_topic = topic(topics_cfg.logs) do
      active(false)
      deserializer tasks_cfg.deserializer
      target.recurring_tasks(true)

      # Keep cron logs of executions for a week and after that remove. Week should be
      # enough and should not produce too much data.
      config(
        'cleanup.policy': 'delete',
        'retention.ms': 604_800_000
      )
    end

    yield(schedules_topic, logs_topic) if block && block.arity.positive?
  end
end