Module: Karafka::Pro::RecurringTasks
- Defined in:
- lib/karafka/pro/recurring_tasks.rb,
lib/karafka/pro/recurring_tasks/task.rb,
lib/karafka/pro/recurring_tasks/errors.rb,
lib/karafka/pro/recurring_tasks/matcher.rb,
lib/karafka/pro/recurring_tasks/consumer.rb,
lib/karafka/pro/recurring_tasks/executor.rb,
lib/karafka/pro/recurring_tasks/listener.rb,
lib/karafka/pro/recurring_tasks/schedule.rb,
lib/karafka/pro/recurring_tasks/dispatcher.rb,
lib/karafka/pro/recurring_tasks/serializer.rb,
lib/karafka/pro/recurring_tasks/deserializer.rb,
lib/karafka/pro/recurring_tasks/setup/config.rb,
lib/karafka/pro/recurring_tasks/contracts/task.rb,
lib/karafka/pro/recurring_tasks/contracts/config.rb
Overview
Recurring tasks functionality
Defined Under Namespace
Modules: Contracts, Errors, Setup Classes: Consumer, Deserializer, Dispatcher, Executor, Listener, Matcher, Schedule, Serializer, Task
Class Method Summary collapse
-
.define(version = '1.0.0', &block) ⇒ Object
Simplified API for schedules definitions and validates the tasks data.
-
.post_fork(config, pre_fork_producer) ⇒ Object
Basically since we may have custom producers configured that are not the same as the default one, we hold a reference to old pre-fork producer.
- .post_setup(config) ⇒ Object
-
.pre_setup(config) ⇒ Object
Sets up additional config scope, validations and other things.
-
.schedule ⇒ Schedule?
Current defined schedule or nil if not defined.
Class Method Details
.define(version = '1.0.0', &block) ⇒ Object
Simplified API for schedules definitions and validates the tasks data
27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/karafka/pro/recurring_tasks.rb', line 27 def define(version = '1.0.0', &block) @schedule = Schedule.new(version: version) @schedule.instance_exec(&block) @schedule.each do |task| Contracts::Task.new.validate!( task.to_h, scope: ['recurring_tasks', task.id] ) end @schedule end |
.post_fork(config, pre_fork_producer) ⇒ Object
Basically since we may have custom producers configured that are not the same as the default one, we hold a reference to old pre-fork producer. This means, that when we initialize it again in post-fork, as long as user uses defaults we should re-inherit it from the default config.
90 91 92 93 94 |
# File 'lib/karafka/pro/recurring_tasks.rb', line 90 def post_fork(config, pre_fork_producer) return unless config.recurring_tasks.producer == pre_fork_producer config.recurring_tasks.producer = config.producer end |
.post_setup(config) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/karafka/pro/recurring_tasks.rb', line 64 def post_setup(config) RecurringTasks::Contracts::Config.new.validate!( config.to_h, scope: %w[config] ) # Published after task is successfully executed Karafka.monitor.notifications_bus.register_event('recurring_tasks.task.executed') # Initialize empty dummy schedule, so we always have one and so we do not have to # deal with a case where there is no schedule RecurringTasks.schedule # User can disable logging of executions, in which case we don't track them return unless Karafka::App.config.recurring_tasks.logging Karafka.monitor.subscribe(Listener.new) end |
.pre_setup(config) ⇒ Object
Sets up additional config scope, validations and other things
56 57 58 59 60 61 |
# File 'lib/karafka/pro/recurring_tasks.rb', line 56 def pre_setup(config) # Expand the config with this feature specific stuff config.instance_eval do setting(:recurring_tasks, default: Setup::Config.config) end end |
.schedule ⇒ Schedule?
Returns current defined schedule or nil if not defined.
12 13 14 |
# File 'lib/karafka/pro/recurring_tasks.rb', line 12 def schedule @schedule || define('0.0.0') {} end |