Module: Karafka::Pro::RecurringTasks
- Defined in:
- lib/karafka/pro/recurring_tasks.rb,
lib/karafka/pro/recurring_tasks/task.rb,
lib/karafka/pro/recurring_tasks/errors.rb,
lib/karafka/pro/recurring_tasks/matcher.rb,
lib/karafka/pro/recurring_tasks/consumer.rb,
lib/karafka/pro/recurring_tasks/executor.rb,
lib/karafka/pro/recurring_tasks/listener.rb,
lib/karafka/pro/recurring_tasks/schedule.rb,
lib/karafka/pro/recurring_tasks/dispatcher.rb,
lib/karafka/pro/recurring_tasks/serializer.rb,
lib/karafka/pro/recurring_tasks/deserializer.rb,
lib/karafka/pro/recurring_tasks/setup/config.rb,
lib/karafka/pro/recurring_tasks/contracts/task.rb,
lib/karafka/pro/recurring_tasks/contracts/config.rb
Overview
Recurring tasks functionality
Defined Under Namespace
Modules: Contracts, Errors, Setup Classes: Consumer, Deserializer, Dispatcher, Executor, Listener, Matcher, Schedule, Serializer, Task
Class Method Summary collapse
-
.define(version = '1.0.0') ⇒ Object
Simplified API for schedules definitions and validates the tasks data.
-
.post_fork(config, pre_fork_producer) ⇒ Object
Basically since we may have custom producers configured that are not the same as the default one, we hold a reference to old pre-fork producer.
-
.post_setup(config) ⇒ Object
-
.pre_setup(config) ⇒ Object
Sets up additional config scope, validations and other things.
-
.schedule ⇒ Schedule?
Current defined schedule or nil if not defined.
Class Method Details
.define(version = '1.0.0') ⇒ Object
Simplified API for schedules definitions and validates the tasks data
30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/karafka/pro/recurring_tasks.rb', line 30 def define(version = '1.0.0', &) @schedule = Schedule.new(version: version) @schedule.instance_exec(&) @schedule.each do |task| Contracts::Task.new.validate!( task.to_h, scope: ['recurring_tasks', task.id] ) end @schedule end |
.post_fork(config, pre_fork_producer) ⇒ Object
Basically since we may have custom producers configured that are not the same as the default one, we hold a reference to old pre-fork producer. This means, that when we initialize it again in post-fork, as long as user uses defaults we should re-inherit it from the default config.
93 94 95 96 97 |
# File 'lib/karafka/pro/recurring_tasks.rb', line 93 def post_fork(config, pre_fork_producer) return unless config.recurring_tasks.producer == pre_fork_producer config.recurring_tasks.producer = config.producer end |
.post_setup(config) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/karafka/pro/recurring_tasks.rb', line 67 def post_setup(config) RecurringTasks::Contracts::Config.new.validate!( config.to_h, scope: %w[config] ) # Published after task is successfully executed Karafka.monitor.notifications_bus.register_event('recurring_tasks.task.executed') # Initialize empty dummy schedule, so we always have one and so we do not have to # deal with a case where there is no schedule RecurringTasks.schedule # User can disable logging of executions, in which case we don't track them return unless recurring_tasks_logging Karafka.monitor.subscribe(Listener.new) end |
.pre_setup(config) ⇒ Object
Sets up additional config scope, validations and other things
59 60 61 62 63 64 |
# File 'lib/karafka/pro/recurring_tasks.rb', line 59 def pre_setup(config) # Expand the config with this feature specific stuff config.instance_eval do setting(:recurring_tasks, default: Setup::Config.config) end end |
.schedule ⇒ Schedule?
Returns current defined schedule or nil if not defined.
16 17 18 |
# File 'lib/karafka/pro/recurring_tasks.rb', line 16 def schedule @schedule || define('0.0.0') { nil } end |