Class: Karafka::Pro::RecurringTasks::Executor
- Inherits:
-
Object
- Object
- Karafka::Pro::RecurringTasks::Executor
- Defined in:
- lib/karafka/pro/recurring_tasks/executor.rb
Overview
Executor is responsible for management of the state of recurring tasks schedule and is the heart of recurring tasks. It coordinates the replaying process as well as tracking of data on changes.
Constant Summary collapse
- COMMANDS =
Task commands we support and that can be triggered on tasks (if matched)
%w[ disable enable trigger ].freeze
Instance Method Summary collapse
-
#apply_command(command_hash) ⇒ Object
Applies given command to task (or many tasks) by running the command on tasks that match.
-
#call ⇒ Object
Run all tasks that should run at a given time and if any tasks were changed in any way or executed, stores the most recent state back to Kafka.
-
#incompatible? ⇒ Boolean
Is the current process schedule incompatible (older) than the one that we have in memory.
-
#initialize ⇒ Executor
constructor
A new instance of Executor.
-
#replay ⇒ Object
Once all previous data is accumulated runs the catchup process to establish current state of the recurring tasks schedule execution.
-
#replaying? ⇒ Boolean
Are we in the replaying phase or not (false means, regular operations).
-
#update_state(schedule_hash) ⇒ Object
Updates the catchup state.
Constructor Details
Instance Method Details
#apply_command(command_hash) ⇒ Object
Applies given command to task (or many tasks) by running the command on tasks that match
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 41 def apply_command(command_hash) cmd_name = command_hash[:command][:name] raise(Karafka::Errors::UnsupportedCaseError, cmd_name) unless COMMANDS.include?(cmd_name) schedule.each do |task| next unless @matcher.matches?(task, command_hash) task.public_send(cmd_name) end end |
#call ⇒ Object
Run all tasks that should run at a given time and if any tasks were changed in any way or executed, stores the most recent state back to Kafka
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 111 def call changed = false schedule.each do |task| changed = true if task.changed? unless task.call? task.clear next end changed = true task.call end snapshot if changed end |
#incompatible? ⇒ Boolean
Returns Is the current process schedule incompatible (older) than the one that we have in memory.
35 36 37 |
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 35 def incompatible? @incompatible end |
#replay ⇒ Object
Once all previous data is accumulated runs the catchup process to establish current state of the recurring tasks schedule execution.
It includes applying any requested commands as well as synchronizing execution details for existing schedule and making sure all is loaded correctly.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 64 def replay # Ensure replaying happens only once return unless @replaying @replaying = false # When there is nothing to replay and synchronize, we can just save the state and # proceed if @catchup_commands.empty? && @catchup_schedule.nil? snapshot return end # If the schedule version we have in Kafka is higher than ours, we cannot proceed # This prevents us from applying older changes to a new schedule if @catchup_schedule[:schedule_version] > schedule.version @incompatible = true return end # Now we can synchronize the in-memory state based on the last state stored in Kafka schedule.each do |task| stored_task = @catchup_schedule[:tasks][task.id.to_sym] next unless stored_task stored_previous_time = stored_task[:previous_time] task.previous_time = stored_previous_time.zero? ? 0 : Time.at(stored_previous_time) stored_task[:enabled] ? task.enable : task.disable end @catchup_commands.each do |cmd| apply_command(cmd) end # We make sure to save in Kafka once more once everything is up to date snapshot @catchup_schedule = nil @catchup_commands = [] end |
#replaying? ⇒ Boolean
Returns are we in the replaying phase or not (false means, regular operations).
29 30 31 |
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 29 def @replaying end |
#update_state(schedule_hash) ⇒ Object
Updates the catchup state
55 56 57 |
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 55 def update_state(schedule_hash) @catchup_schedule = schedule_hash end |