Module: Karafka::Pro::Routing::Features::ScheduledMessages::Builder
- Included in:
- Proxy
- Defined in:
- lib/karafka/pro/routing/features/scheduled_messages/builder.rb
Overview
Routing extensions for scheduled messages
Instance Method Summary collapse
-
#scheduled_messages(group_name = false, &block) ⇒ Object
Enabled scheduled messages operations and adds needed topics and other stuff.
Instance Method Details
#scheduled_messages(group_name = false, &block) ⇒ Object
Note:
Namespace for topics should include the divider as it is not automatically added.
Enabled scheduled messages operations and adds needed topics and other stuff.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/karafka/pro/routing/features/scheduled_messages/builder.rb', line 21 def (group_name = false, &block) return unless group_name # Load zlib only if user enables scheduled messages require 'zlib' # We set it to 5 so we have enough space to handle more events. All related topics # should have same partition count. default_partitions = 5 msg_cfg = App.config. consumer_group msg_cfg.group_id do # Registers the primary topic that we use to control schedules execution. This is # the one that we use to trigger scheduled messages. = topic(group_name) do instance_eval(&block) if block && block.arity.zero? consumer msg_cfg.consumer_class deserializers( headers: msg_cfg.deserializers.headers ) # Because the topic method name as well as builder proxy method name is the same # we need to reference it via target directly target.(true) # We manage offsets directly because messages can have both schedules and # commands and we need to apply them only when we need to manual_offset_management(true) # We use multi-batch operations and in-memory state for schedules. This needs to # always operate without re-creation. consumer_persistence(true) # This needs to be enabled for the eof to work correctly kafka('enable.partition.eof': true, inherit: true) eofed(true) # Since this is a topic that gets replayed because of schedule management, we do # want to get more data faster during recovery (10_000) max_wait_time(1_000) # This is a setup that should allow messages to be compacted fairly fast. Since # each dispatched message should be removed via tombstone, they do not have to # be present in the topic for too long. config( partitions: default_partitions, # Will ensure, that after tombstone is present, given scheduled message, that # has been dispatched is removed by Kafka 'cleanup.policy': 'compact', # When 10% or more dispatches are done, compact data 'min.cleanable.dirty.ratio': 0.1, # Frequent segment rotation to support intense compaction 'segment.ms': 3_600_000, 'delete.retention.ms': 3_600_000, 'segment.bytes': 52_428_800 ) # This is the core of execution. Since we dispatch data in time intervals, we # need to be able to do this even when no new data is coming periodic( interval: msg_cfg.interval, during_pause: false, during_retry: false ) # If this is the direct schedules redefinition style, we run it # The second one (see end of this method) allows for linear reconfiguration of # both the topics instance_eval(&block) if block && block.arity.zero? end # Holds states of scheduler per each of the partitions since they tick # independently. We only hold future statistics not to have to deal with # any type of state restoration states_topic = topic("#{group_name}#{msg_cfg.states_postfix}") do active(false) target.(true) config( partitions: default_partitions, 'cleanup.policy': 'compact', 'min.cleanable.dirty.ratio': 0.1, 'segment.ms': 3_600_000, 'delete.retention.ms': 3_600_000, 'segment.bytes': 52_428_800 ) deserializers( payload: msg_cfg.deserializers.payload ) end yield(, states_topic) if block && block.arity.positive? end end |