Module: Karafka::Pro::ScheduledMessages

Defined in:
lib/karafka/pro/scheduled_messages.rb,
lib/karafka/pro/scheduled_messages/day.rb,
lib/karafka/pro/scheduled_messages/proxy.rb,
lib/karafka/pro/scheduled_messages/state.rb,
lib/karafka/pro/scheduled_messages/errors.rb,
lib/karafka/pro/scheduled_messages/tracker.rb,
lib/karafka/pro/scheduled_messages/consumer.rb,
lib/karafka/pro/scheduled_messages/max_epoch.rb,
lib/karafka/pro/scheduled_messages/dispatcher.rb,
lib/karafka/pro/scheduled_messages/serializer.rb,
lib/karafka/pro/scheduled_messages/daily_buffer.rb,
lib/karafka/pro/scheduled_messages/setup/config.rb,
lib/karafka/pro/scheduled_messages/contracts/config.rb,
lib/karafka/pro/scheduled_messages/schema_validator.rb,
lib/karafka/pro/scheduled_messages/contracts/message.rb,
lib/karafka/pro/scheduled_messages/deserializers/headers.rb,
lib/karafka/pro/scheduled_messages/deserializers/payload.rb

Overview

This feature allows for proxying messages via a special topic that can dispatch them at a later time, hence scheduled messages. Such messages need to have a special format but aside from that they are regular Kafka messages.

This work was conceptually inspired by the Go scheduler: github.com/etf1/kafka-message-scheduler though I did not look at the implementation itself. Just the concept of daily in-memory scheduling.

Defined Under Namespace

Modules: Contracts, Deserializers, Errors, Proxy, SchemaValidator, Setup Classes: Consumer, DailyBuffer, Day, Dispatcher, MaxEpoch, Serializer, State, Tracker

Constant Summary collapse

SCHEMA_VERSION =

Version of the schema we use for envelops in scheduled messages. We use it to detect any potential upgrades similar to other components of Karafka and to stop processing of incompatible versions

'1.0.0'
STATES_SCHEMA_VERSION =

Version of the states schema. Used to publish per partition simple aggregated metrics that can be used for schedules reporting

'1.0.0'

Class Method Summary collapse

Class Method Details

.cancel(**kwargs) ⇒ Hash

Generates a tombstone message to cancel given dispatch (if not yet happened)

Parameters:

  • kwargs (Hash)

    things requested by the proxy

Returns:

  • (Hash)

    tombstone cancelling message



36
37
38
# File 'lib/karafka/pro/scheduled_messages.rb', line 36

def cancel(**kwargs)
  Proxy.cancel(**kwargs)
end

.post_setup(config) ⇒ Object

Parameters:

  • config (Karafka::Core::Configurable::Node)

    root node config



53
54
55
# File 'lib/karafka/pro/scheduled_messages.rb', line 53

def post_setup(config)
  RecurringTasks::Contracts::Config.new.validate!(config.to_h)
end

.pre_setup(config) ⇒ Object

Sets up additional config scope, validations and other things

Parameters:

  • config (Karafka::Core::Configurable::Node)

    root node config



45
46
47
48
49
50
# File 'lib/karafka/pro/scheduled_messages.rb', line 45

def pre_setup(config)
  # Expand the config with this feature specific stuff
  config.instance_eval do
    setting(:scheduled_messages, default: Setup::Config.config)
  end
end

.schedule(**kwargs) ⇒ Hash

Runs the Proxy.call

Parameters:

  • kwargs (Hash)

    things requested by the proxy

Returns:

  • (Hash)

    message wrapped with the scheduled message envelope



29
30
31
# File 'lib/karafka/pro/scheduled_messages.rb', line 29

def schedule(**kwargs)
  Proxy.schedule(**kwargs)
end