Module: Karafka::Pro::ScheduledMessages::Proxy

Defined in:
lib/karafka/pro/scheduled_messages/proxy.rb

Overview

Proxy used to wrap the scheduled messages with the correct dispatch envelope. Each message that goes to the scheduler topic needs to have specific headers and other details that are required by the system so we know how and when to dispatch it.

Each message that goes to the proxy topic needs to have a unique key. We inject those automatically unless user provides one in an envelope. Since we want to make sure, that the messages dispatched by the user all go to the same partition (if with same key), we inject a partition_key based on the user key or other details if present. That allows us to make sure, that they will always go to the same partition on our side.

This wrapper validates the initial message that user wants to send in the future, as well as the envelope and specific requirements for a message to be send in the future

Class Method Summary collapse

Class Method Details

.cancel(key:, envelope: {}) ⇒ Hash

Note:

Technically it is a tombstone but we differentiate just for the sake of ability to debug stuff if needed

Generates a tombstone message to cancel already scheduled message dispatch

Parameters:

  • key (String)

    key used by the original message as a unique identifier

  • envelope (Hash) (defaults to: {})

    Special details that can identify the message location like topic and partition (if used) so the cancellation goes to the correct location.

Returns:

  • (Hash)

    cancellation message



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 95

def cancel(key:, envelope: {})
  proxy_message = {
    key: key,
    payload: nil,
    headers: {
      'schedule_schema_version' => ScheduledMessages::SCHEMA_VERSION,
      'schedule_source_type' => 'cancel'
    }
  }.merge(envelope)

  # Ensure user provided envelope is with all expected details
  validate!(proxy_message)

  proxy_message
end

.schedule(message:, epoch:, envelope: {}) ⇒ Hash

Note:

This proxy does not inject the dispatched messages topic unless provided in the envelope. That’s because user can have multiple scheduled messages topics to group outgoing messages, etc.

Generates a schedule message envelope wrapping the original dispatch

Parameters:

  • message (Hash)

    message hash of a message that would originally go to WaterDrop producer directly.

  • epoch (Integer)

    time in the future (or now) when dispatch this message in the Unix epoch timestamp

  • envelope (Hash) (defaults to: {})

    Special details that the envelop needs to have, like a unique key. If unique key is not provided we build a random unique one and use a partition_key based on the original message key (if present) to ensure that all relevant messages are dispatched to the same topic partition.

Returns:

  • (Hash)

    dispatched message wrapped with an envelope



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 59

def schedule(message:, epoch:, envelope: {})
  # We need to ensure that the message we want to proxy is fully legit. Otherwise, since
  # we envelope details like target topic, we could end up having incorrect data to
  # schedule
  MSG_CONTRACT.validate!(message, WaterDrop::Errors::MessageInvalidError)

  headers = (message[:headers] || {}).merge(
    'schedule_schema_version' => ScheduledMessages::SCHEMA_VERSION,
    'schedule_target_epoch' => epoch.to_i.to_s,
    'schedule_source_type' => 'schedule'
  )

  export(headers, message, :topic)
  export(headers, message, :partition)
  export(headers, message, :key)
  export(headers, message, :partition_key)

  proxy_message = {
    payload: message[:payload],
    headers: headers
  }.merge(envelope)

  enrich(proxy_message, message)
  validate!(proxy_message)

  proxy_message
end

.tombstone(message:) ⇒ Object

Builds tombstone with the dispatched message details. Those details can be used in Web UI, etc when analyzing dispatches.

Parameters:

  • message (Karafka::Messages::Message)

    message we want to tombstone topic and partition (if used) so the cancellation goes to the correct location.



115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 115

def tombstone(message:)
  {
    key: message.key,
    payload: nil,
    topic: message.topic,
    partition: message.partition,
    headers: message.raw_headers.merge(
      'schedule_schema_version' => ScheduledMessages::SCHEMA_VERSION,
      'schedule_source_type' => 'tombstone',
      'schedule_source_offset' => message.offset.to_s
    )
  }
end