Module: Karafka::Pro::Routing::Features::ScheduledMessages::Builder

Included in:
Proxy
Defined in:
lib/karafka/pro/routing/features/scheduled_messages/builder.rb

Overview

Routing extensions for scheduled messages

Instance Method Summary collapse

Instance Method Details

#scheduled_messages(group_name = false, &block) ⇒ Object

Note:

Namespace for topics should include the divider as it is not automatically added.

Enabled scheduled messages operations and adds needed topics and other stuff.

Parameters:

  • group_name (String, false) (defaults to: false)

    name for scheduled messages topic that is also used as a group identifier. Users can have multiple schedule topics flows to prevent key collisions, prioritize and do other stuff. false if not active.

  • block (Proc)

    optional reconfiguration of the topics definitions.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/karafka/pro/routing/features/scheduled_messages/builder.rb', line 29

def scheduled_messages(group_name = false, &block)
  return unless group_name

  # Load zlib only if user enables scheduled messages
  require 'zlib'

  # We set it to 5 so we have enough space to handle more events. All related topics
  # should have same partition count.
  default_partitions = 5
  msg_cfg = App.config.scheduled_messages

  consumer_group msg_cfg.group_id do
    # Registers the primary topic that we use to control schedules execution. This is
    # the one that we use to trigger scheduled messages.
    messages_topic = topic(group_name) do
      instance_eval(&block) if block && block.arity.zero?

      consumer msg_cfg.consumer_class

      deserializers(
        headers: msg_cfg.deserializers.headers
      )

      # Because the topic method name as well as builder proxy method name is the same
      # we need to reference it via target directly
      target.scheduled_messages(true)

      # We manage offsets directly because messages can have both schedules and
      # commands and we need to apply them only when we need to
      manual_offset_management(true)

      # We use multi-batch operations and in-memory state for schedules. This needs to
      # always operate without re-creation.
      consumer_persistence(true)

      # This needs to be enabled for the eof to work correctly
      kafka('enable.partition.eof': true, inherit: true)
      eofed(true)

      # Since this is a topic that gets replayed because of schedule management, we do
      # want to get more data faster during recovery
      max_messages(10_000)

      max_wait_time(1_000)

      # This is a setup that should allow messages to be compacted fairly fast. Since
      # each dispatched message should be removed via tombstone, they do not have to
      # be present in the topic for too long.
      config(
        partitions: default_partitions,
        # Will ensure, that after tombstone is present, given scheduled message, that
        # has been dispatched is removed by Kafka
        'cleanup.policy': 'compact',
        # When 10% or more dispatches are done, compact data
        'min.cleanable.dirty.ratio': 0.1,
        # Frequent segment rotation to support intense compaction
        'segment.ms': 3_600_000,
        'delete.retention.ms': 3_600_000,
        'segment.bytes': 52_428_800
      )

      # This is the core of execution. Since we dispatch data in time intervals, we
      # need to be able to do this even when no new data is coming
      periodic(
        interval: msg_cfg.interval,
        during_pause: false,
        during_retry: false
      )

      # If this is the direct schedules redefinition style, we run it
      # The second one (see end of this method) allows for linear reconfiguration of
      # both the topics
      instance_eval(&block) if block && block.arity.zero?
    end

    # Holds states of scheduler per each of the partitions since they tick
    # independently. We only hold future statistics not to have to deal with
    # any type of state restoration
    states_topic = topic("#{group_name}#{msg_cfg.states_postfix}") do
      active(false)
      target.scheduled_messages(true)
      config(
        partitions: default_partitions,
        'cleanup.policy': 'compact',
        'min.cleanable.dirty.ratio': 0.1,
        'segment.ms': 3_600_000,
        'delete.retention.ms': 3_600_000,
        'segment.bytes': 52_428_800
      )
      deserializers(
        payload: msg_cfg.deserializers.payload
      )
    end

    yield(messages_topic, states_topic) if block && block.arity.positive?
  end
end