Module: Karafka::Pro::ScheduledMessages::Proxy
- Defined in:
- lib/karafka/pro/scheduled_messages/proxy.rb
Overview
Proxy used to wrap the scheduled messages with the correct dispatch envelope. Each message that goes to the scheduler topic needs to have specific headers and other details that are required by the system so we know how and when to dispatch it.
Each message that goes to the proxy topic needs to have a unique key. We inject those automatically unless user provides one in an envelope. Since we want to make sure, that the messages dispatched by the user all go to the same partition (if with same key), we inject a partition_key based on the user key or other details if present. That allows us to make sure, that they will always go to the same partition on our side.
This wrapper validates the initial message that user wants to send in the future, as well as the envelope and specific requirements for a message to be send in the future
Class Method Summary collapse
-
.cancel(key:, envelope: {}) ⇒ Hash
Generates a tombstone message to cancel already scheduled message dispatch.
-
.schedule(message:, epoch:, envelope: {}) ⇒ Hash
Generates a schedule message envelope wrapping the original dispatch.
-
.tombstone(message:) ⇒ Object
Builds tombstone with the dispatched message details.
Class Method Details
.cancel(key:, envelope: {}) ⇒ Hash
Technically it is a tombstone but we differentiate just for the sake of ability to debug stuff if needed
Generates a tombstone message to cancel already scheduled message dispatch
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 95 def cancel(key:, envelope: {}) = { key: key, payload: nil, headers: { 'schedule_schema_version' => ScheduledMessages::SCHEMA_VERSION, 'schedule_source_type' => 'cancel' } }.merge(envelope) # Ensure user provided envelope is with all expected details validate!() end |
.schedule(message:, epoch:, envelope: {}) ⇒ Hash
This proxy does not inject the dispatched messages topic unless provided in the envelope. That’s because user can have multiple scheduled messages topics to group outgoing messages, etc.
Generates a schedule message envelope wrapping the original dispatch
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 59 def schedule(message:, epoch:, envelope: {}) # We need to ensure that the message we want to proxy is fully legit. Otherwise, since # we envelope details like target topic, we could end up having incorrect data to # schedule MSG_CONTRACT.validate!(, WaterDrop::Errors::MessageInvalidError) headers = ([:headers] || {}).merge( 'schedule_schema_version' => ScheduledMessages::SCHEMA_VERSION, 'schedule_target_epoch' => epoch.to_i.to_s, 'schedule_source_type' => 'schedule' ) export(headers, , :topic) export(headers, , :partition) export(headers, , :key) export(headers, , :partition_key) = { payload: [:payload], headers: headers }.merge(envelope) enrich(, ) validate!() end |
.tombstone(message:) ⇒ Object
Builds tombstone with the dispatched message details. Those details can be used in Web UI, etc when analyzing dispatches.
115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 115 def tombstone(message:) { key: .key, payload: nil, topic: .topic, partition: .partition, headers: .raw_headers.merge( 'schedule_schema_version' => ScheduledMessages::SCHEMA_VERSION, 'schedule_source_type' => 'tombstone', 'schedule_source_offset' => .offset.to_s ) } end |