Module: Karafka::Pro::ScheduledMessages
- Defined in:
- lib/karafka/pro/scheduled_messages.rb,
lib/karafka/pro/scheduled_messages/day.rb,
lib/karafka/pro/scheduled_messages/proxy.rb,
lib/karafka/pro/scheduled_messages/state.rb,
lib/karafka/pro/scheduled_messages/errors.rb,
lib/karafka/pro/scheduled_messages/tracker.rb,
lib/karafka/pro/scheduled_messages/consumer.rb,
lib/karafka/pro/scheduled_messages/max_epoch.rb,
lib/karafka/pro/scheduled_messages/dispatcher.rb,
lib/karafka/pro/scheduled_messages/serializer.rb,
lib/karafka/pro/scheduled_messages/daily_buffer.rb,
lib/karafka/pro/scheduled_messages/setup/config.rb,
lib/karafka/pro/scheduled_messages/contracts/config.rb,
lib/karafka/pro/scheduled_messages/schema_validator.rb,
lib/karafka/pro/scheduled_messages/contracts/message.rb,
lib/karafka/pro/scheduled_messages/deserializers/headers.rb,
lib/karafka/pro/scheduled_messages/deserializers/payload.rb
Overview
This feature allows for proxying messages via a special topic that can dispatch them at a later time, hence scheduled messages. Such messages need to have a special format but aside from that they are regular Kafka messages.
This work was conceptually inspired by the Go scheduler: github.com/etf1/kafka-message-scheduler though I did not look at the implementation itself. Just the concept of daily in-memory scheduling.
Defined Under Namespace
Modules: Contracts, Deserializers, Errors, Proxy, SchemaValidator, Setup Classes: Consumer, DailyBuffer, Day, Dispatcher, MaxEpoch, Serializer, State, Tracker
Constant Summary collapse
- SCHEMA_VERSION =
Version of the schema we use for envelops in scheduled messages. We use it to detect any potential upgrades similar to other components of Karafka and to stop processing of incompatible versions
'1.0.0'
- STATES_SCHEMA_VERSION =
Version of the states schema. Used to publish per partition simple aggregated metrics that can be used for schedules reporting
'1.0.0'
Class Method Summary collapse
-
.cancel(**kwargs) ⇒ Hash
Generates a tombstone message to cancel given dispatch (if not yet happened).
-
.post_fork(config, pre_fork_producer) ⇒ Object
Basically since we may have custom producers configured that are not the same as the default one, we hold a reference to old pre-fork producer.
- .post_setup(config) ⇒ Object
-
.pre_setup(config) ⇒ Object
Sets up additional config scope, validations and other things.
-
.schedule(**kwargs) ⇒ Hash
Runs the
Proxy.call
.
Class Method Details
.cancel(**kwargs) ⇒ Hash
Generates a tombstone message to cancel given dispatch (if not yet happened)
36 37 38 |
# File 'lib/karafka/pro/scheduled_messages.rb', line 36 def cancel(**kwargs) Proxy.cancel(**kwargs) end |
.post_fork(config, pre_fork_producer) ⇒ Object
Basically since we may have custom producers configured that are not the same as the default one, we hold a reference to old pre-fork producer. This means, that when we initialize it again in post-fork, as long as user uses defaults we should re-inherit it from the default config.
67 68 69 70 71 |
# File 'lib/karafka/pro/scheduled_messages.rb', line 67 def post_fork(config, pre_fork_producer) return unless config..producer == pre_fork_producer config..producer = config.producer end |
.post_setup(config) ⇒ Object
53 54 55 56 57 58 |
# File 'lib/karafka/pro/scheduled_messages.rb', line 53 def post_setup(config) ScheduledMessages::Contracts::Config.new.validate!( config.to_h, scope: %w[config] ) end |
.pre_setup(config) ⇒ Object
Sets up additional config scope, validations and other things
45 46 47 48 49 50 |
# File 'lib/karafka/pro/scheduled_messages.rb', line 45 def pre_setup(config) # Expand the config with this feature specific stuff config.instance_eval do setting(:scheduled_messages, default: Setup::Config.config) end end |