Module: Karafka::Pro::ScheduledMessages

Defined in:
lib/karafka/pro/scheduled_messages.rb,
lib/karafka/pro/scheduled_messages/day.rb,
lib/karafka/pro/scheduled_messages/proxy.rb,
lib/karafka/pro/scheduled_messages/state.rb,
lib/karafka/pro/scheduled_messages/errors.rb,
lib/karafka/pro/scheduled_messages/tracker.rb,
lib/karafka/pro/scheduled_messages/consumer.rb,
lib/karafka/pro/scheduled_messages/max_epoch.rb,
lib/karafka/pro/scheduled_messages/dispatcher.rb,
lib/karafka/pro/scheduled_messages/serializer.rb,
lib/karafka/pro/scheduled_messages/daily_buffer.rb,
lib/karafka/pro/scheduled_messages/setup/config.rb,
lib/karafka/pro/scheduled_messages/contracts/config.rb,
lib/karafka/pro/scheduled_messages/schema_validator.rb,
lib/karafka/pro/scheduled_messages/contracts/message.rb,
lib/karafka/pro/scheduled_messages/deserializers/headers.rb,
lib/karafka/pro/scheduled_messages/deserializers/payload.rb

Overview

This feature allows for proxying messages via a special topic that can dispatch them at a later time, hence scheduled messages. Such messages need to have a special format but aside from that they are regular Kafka messages.

This work was conceptually inspired by the Go scheduler: github.com/etf1/kafka-message-scheduler though I did not look at the implementation itself. Just the concept of daily in-memory scheduling.

Defined Under Namespace

Modules: Contracts, Deserializers, Errors, Proxy, SchemaValidator, Setup Classes: Consumer, DailyBuffer, Day, Dispatcher, MaxEpoch, Serializer, State, Tracker

Constant Summary collapse

SCHEMA_VERSION =

Version of the schema we use for envelops in scheduled messages. We use it to detect any potential upgrades similar to other components of Karafka and to stop processing of incompatible versions

'1.0.0'
STATES_SCHEMA_VERSION =

Version of the states schema. Used to publish per partition simple aggregated metrics that can be used for schedules reporting

'1.0.0'

Class Method Summary collapse

Class Method Details

.cancel(**kwargs) ⇒ Hash

Generates a tombstone message to cancel given dispatch (if not yet happened)

Parameters:

  • kwargs (Hash)

    things requested by the proxy

Returns:

  • (Hash)

    tombstone cancelling message



36
37
38
# File 'lib/karafka/pro/scheduled_messages.rb', line 36

def cancel(**kwargs)
  Proxy.cancel(**kwargs)
end

.post_fork(config, pre_fork_producer) ⇒ Object

Basically since we may have custom producers configured that are not the same as the default one, we hold a reference to old pre-fork producer. This means, that when we initialize it again in post-fork, as long as user uses defaults we should re-inherit it from the default config.

Parameters:

  • config (Karafka::Core::Configurable::Node)
  • pre_fork_producer (WaterDrop::Producer)


67
68
69
70
71
# File 'lib/karafka/pro/scheduled_messages.rb', line 67

def post_fork(config, pre_fork_producer)
  return unless config.scheduled_messages.producer == pre_fork_producer

  config.scheduled_messages.producer = config.producer
end

.post_setup(config) ⇒ Object

Parameters:

  • config (Karafka::Core::Configurable::Node)

    root node config



53
54
55
56
57
58
# File 'lib/karafka/pro/scheduled_messages.rb', line 53

def post_setup(config)
  ScheduledMessages::Contracts::Config.new.validate!(
    config.to_h,
    scope: %w[config]
  )
end

.pre_setup(config) ⇒ Object

Sets up additional config scope, validations and other things

Parameters:

  • config (Karafka::Core::Configurable::Node)

    root node config



45
46
47
48
49
50
# File 'lib/karafka/pro/scheduled_messages.rb', line 45

def pre_setup(config)
  # Expand the config with this feature specific stuff
  config.instance_eval do
    setting(:scheduled_messages, default: Setup::Config.config)
  end
end

.schedule(**kwargs) ⇒ Hash

Runs the Proxy.call

Parameters:

  • kwargs (Hash)

    things requested by the proxy

Returns:

  • (Hash)

    message wrapped with the scheduled message envelope



29
30
31
# File 'lib/karafka/pro/scheduled_messages.rb', line 29

def schedule(**kwargs)
  Proxy.schedule(**kwargs)
end