Module: Karafka::Pro::Routing::Features::ScheduledMessages::Builder

Included in:
Proxy
Defined in:
lib/karafka/pro/routing/features/scheduled_messages/builder.rb

Overview

Routing extensions for scheduled messages

Instance Method Summary collapse

Instance Method Details

#scheduled_messages(group_name = false, &block) ⇒ Object

Note:

Namespace for topics should include the divider as it is not automatically added.

Enabled scheduled messages operations and adds needed topics and other stuff.

Parameters:

  • group_name (String, false) (defaults to: false)

    name for scheduled messages topic that is also used as a group identifier. Users can have multiple schedule topics flows to prevent key collisions, prioritize and do other stuff. false if not active.

  • block (Proc)

    optional reconfiguration of the topics definitions.

[View source]

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/karafka/pro/routing/features/scheduled_messages/builder.rb', line 21

def scheduled_messages(group_name = false, &block)
  return unless group_name

  # Load zlib only if user enables scheduled messages
  require 'zlib'

  # We set it to 5 so we have enough space to handle more events. All related topics
  # should have same partition count.
  default_partitions = 5
  msg_cfg = App.config.scheduled_messages

  consumer_group msg_cfg.group_id do
    # Registers the primary topic that we use to control schedules execution. This is
    # the one that we use to trigger scheduled messages.
    messages_topic = topic(group_name) do
      instance_eval(&block) if block && block.arity.zero?

      consumer msg_cfg.consumer_class

      deserializers(
        headers: msg_cfg.deserializers.headers
      )

      # Because the topic method name as well as builder proxy method name is the same
      # we need to reference it via target directly
      target.scheduled_messages(true)

      # We manage offsets directly because messages can have both schedules and
      # commands and we need to apply them only when we need to
      manual_offset_management(true)

      # We use multi-batch operations and in-memory state for schedules. This needs to
      # always operate without re-creation.
      consumer_persistence(true)

      # This needs to be enabled for the eof to work correctly
      kafka('enable.partition.eof': true, inherit: true)
      eofed(true)

      # Since this is a topic that gets replayed because of schedule management, we do
      # want to get more data faster during recovery
      max_messages(10_000)

      max_wait_time(1_000)

      # This is a setup that should allow messages to be compacted fairly fast. Since
      # each dispatched message should be removed via tombstone, they do not have to
      # be present in the topic for too long.
      config(
        partitions: default_partitions,
        # Will ensure, that after tombstone is present, given scheduled message, that
        # has been dispatched is removed by Kafka
        'cleanup.policy': 'compact',
        # When 10% or more dispatches are done, compact data
        'min.cleanable.dirty.ratio': 0.1,
        # Frequent segment rotation to support intense compaction
        'segment.ms': 3_600_000,
        'delete.retention.ms': 3_600_000,
        'segment.bytes': 52_428_800
      )

      # This is the core of execution. Since we dispatch data in time intervals, we
      # need to be able to do this even when no new data is coming
      periodic(
        interval: msg_cfg.interval,
        during_pause: false,
        during_retry: false
      )

      # If this is the direct schedules redefinition style, we run it
      # The second one (see end of this method) allows for linear reconfiguration of
      # both the topics
      instance_eval(&block) if block && block.arity.zero?
    end

    # Holds states of scheduler per each of the partitions since they tick
    # independently. We only hold future statistics not to have to deal with
    # any type of state restoration
    states_topic = topic("#{group_name}#{msg_cfg.states_postfix}") do
      active(false)
      target.scheduled_messages(true)
      config(
        partitions: default_partitions,
        'cleanup.policy': 'compact',
        'min.cleanable.dirty.ratio': 0.1,
        'segment.ms': 3_600_000,
        'delete.retention.ms': 3_600_000,
        'segment.bytes': 52_428_800
      )
      deserializers(
        payload: msg_cfg.deserializers.payload
      )
    end

    yield(messages_topic, states_topic) if block && block.arity.positive?
  end
end