Class: Karafka::Pro::ScheduledMessages::Dispatcher
- Inherits:
-
Object
- Object
- Karafka::Pro::ScheduledMessages::Dispatcher
- Defined in:
- lib/karafka/pro/scheduled_messages/dispatcher.rb
Overview
Dispatcher responsible for dispatching the messages to appropriate target topics and for dispatching other messages. All messages (aside from the once users dispatch with the envelope) are sent via this dispatcher.
Messages are buffered and dispatched in batches to improve dispatch performance.
Instance Attribute Summary collapse
-
#buffer ⇒ Array<Hash>
readonly
Buffer with message hashes for dispatch.
Instance Method Summary collapse
-
#<<(message) ⇒ Object
Prepares the scheduled message to the dispatch to the target topic.
-
#flush ⇒ Object
Sends all messages to Kafka in a sync way.
-
#initialize(topic, partition) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
-
#state(tracker) ⇒ Object
Builds and dispatches the state report message with schedules details.
Constructor Details
#initialize(topic, partition) ⇒ Dispatcher
Returns a new instance of Dispatcher.
20 21 22 23 24 25 |
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 20 def initialize(topic, partition) @topic = topic @partition = partition @buffer = [] @serializer = Serializer.new end |
Instance Attribute Details
#buffer ⇒ Array<Hash> (readonly)
Returns buffer with message hashes for dispatch.
16 17 18 |
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 16 def buffer @buffer end |
Instance Method Details
#<<(message) ⇒ Object
This method adds the message to the buffer, does not dispatch it.
It also produces needed tombstone event as well as an audit log message
Prepares the scheduled message to the dispatch to the target topic. Extracts all the “schedule_” details and prepares it, so the dispatched message goes with the expected attributes to the desired location. Alongside of that it actually builds 2 (1 if logs off) messages: tombstone event matching the schedule so it is no longer valid and the log message that has the same data as the dispatched message. Helpful when debugging.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 38 def <<() target_headers = .raw_headers.merge( 'schedule_source_topic' => @topic, 'schedule_source_partition' => @partition.to_s, 'schedule_source_offset' => .offset.to_s, 'schedule_source_key' => .key ).compact target = { payload: .raw_payload, headers: target_headers } extract(target, .headers, :topic) extract(target, .headers, :partition) extract(target, .headers, :key) extract(target, .headers, :partition_key) @buffer << target # Tombstone message so this schedule is no longer in use and gets removed from Kafka by # Kafka itself during compacting. It will not cancel it because already dispatched but # will cause it not to be sent again and will be marked as dispatched. @buffer << Proxy.tombstone(message: ) end |
#flush ⇒ Object
Sends all messages to Kafka in a sync way. We use sync with batches to prevent overloading. When transactional producer in use, this will be wrapped in a transaction automatically.
82 83 84 85 86 87 88 89 90 |
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 82 def flush until @buffer.empty? config.producer.produce_many_sync( # We can remove this prior to the dispatch because we only evict messages from the # daily buffer once dispatch is successful @buffer.shift(config.flush_batch_size) ) end end |
#state(tracker) ⇒ Object
This is dispatched async because it’s just a statistical metric.
Builds and dispatches the state report message with schedules details
69 70 71 72 73 74 75 76 77 |
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 69 def state(tracker) config.producer.produce_async( topic: "#{@topic}#{config.states_postfix}", payload: @serializer.state(tracker), key: 'state', partition: @partition, headers: { 'zlib' => 'true' } ) end |