Module: Karafka::Pro::ScheduledMessages

Defined in:
lib/karafka/pro/scheduled_messages.rb,
lib/karafka/pro/scheduled_messages/day.rb,
lib/karafka/pro/scheduled_messages/proxy.rb,
lib/karafka/pro/scheduled_messages/state.rb,
lib/karafka/pro/scheduled_messages/errors.rb,
lib/karafka/pro/scheduled_messages/tracker.rb,
lib/karafka/pro/scheduled_messages/consumer.rb,
lib/karafka/pro/scheduled_messages/max_epoch.rb,
lib/karafka/pro/scheduled_messages/dispatcher.rb,
lib/karafka/pro/scheduled_messages/serializer.rb,
lib/karafka/pro/scheduled_messages/daily_buffer.rb,
lib/karafka/pro/scheduled_messages/setup/config.rb,
lib/karafka/pro/scheduled_messages/contracts/config.rb,
lib/karafka/pro/scheduled_messages/schema_validator.rb,
lib/karafka/pro/scheduled_messages/contracts/message.rb,
lib/karafka/pro/scheduled_messages/deserializers/headers.rb,
lib/karafka/pro/scheduled_messages/deserializers/payload.rb

Overview

This feature allows for proxying messages via a special topic that can dispatch them at a later time, hence scheduled messages. Such messages need to have a special format but aside from that they are regular Kafka messages.

This work was conceptually inspired by the Go scheduler: github.com/etf1/kafka-message-scheduler though I did not look at the implementation itself. Just the concept of daily in-memory scheduling.

Defined Under Namespace

Modules: Contracts, Deserializers, Errors, Proxy, SchemaValidator, Setup Classes: Consumer, DailyBuffer, Day, Dispatcher, MaxEpoch, Serializer, State, Tracker

Constant Summary collapse

SCHEMA_VERSION =

Version of the schema we use for envelops in scheduled messages. We use it to detect any potential upgrades similar to other components of Karafka and to stop processing of incompatible versions

'1.0.0'
STATES_SCHEMA_VERSION =

Version of the states schema. Used to publish per partition simple aggregated metrics that can be used for schedules reporting

'1.0.0'

Class Method Summary collapse

Class Method Details

.cancel(**kwargs) ⇒ Hash

Generates a tombstone message to cancel given dispatch (if not yet happened)

Parameters:

  • kwargs (Hash)

    things requested by the proxy

Returns:

  • (Hash)

    tombstone cancelling message



44
45
46
# File 'lib/karafka/pro/scheduled_messages.rb', line 44

def cancel(**kwargs)
  Proxy.cancel(**kwargs)
end

.post_setup(config) ⇒ Object

Parameters:

  • config (Karafka::Core::Configurable::Node)

    root node config



61
62
63
# File 'lib/karafka/pro/scheduled_messages.rb', line 61

def post_setup(config)
  RecurringTasks::Contracts::Config.new.validate!(config.to_h)
end

.pre_setup(config) ⇒ Object

Sets up additional config scope, validations and other things

Parameters:

  • config (Karafka::Core::Configurable::Node)

    root node config



53
54
55
56
57
58
# File 'lib/karafka/pro/scheduled_messages.rb', line 53

def pre_setup(config)
  # Expand the config with this feature specific stuff
  config.instance_eval do
    setting(:scheduled_messages, default: Setup::Config.config)
  end
end

.schedule(**kwargs) ⇒ Hash

Runs the Proxy.call

Parameters:

  • kwargs (Hash)

    things requested by the proxy

Returns:

  • (Hash)

    message wrapped with the scheduled message envelope



37
38
39
# File 'lib/karafka/pro/scheduled_messages.rb', line 37

def schedule(**kwargs)
  Proxy.schedule(**kwargs)
end