Class: Karafka::Pro::RecurringTasks::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/recurring_tasks/executor.rb

Overview

Executor is responsible for management of the state of recurring tasks schedule and is the heart of recurring tasks. It coordinates the replaying process as well as tracking of data on changes.

Constant Summary collapse

COMMANDS =

Task commands we support and that can be triggered on tasks (if matched)

%w[
  disable
  enable
  trigger
].freeze

Instance Method Summary collapse

Constructor Details

#initializeExecutor

Returns a new instance of Executor.



20
21
22
23
24
25
26
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 20

def initialize
  @replaying = true
  @incompatible = false
  @catchup_commands = []
  @catchup_schedule = nil
  @matcher = Matcher.new
end

Instance Method Details

#apply_command(command_hash) ⇒ Object

Applies given command to task (or many tasks) by running the command on tasks that match

Parameters:

  • command_hash (Hash)

    deserialized command data

Raises:



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 41

def apply_command(command_hash)
  cmd_name = command_hash[:command][:name]

  raise(Karafka::Errors::UnsupportedCaseError, cmd_name) unless COMMANDS.include?(cmd_name)

  schedule.each do |task|
    next unless @matcher.matches?(task, command_hash)

    task.public_send(cmd_name)
  end
end

#callObject

Run all tasks that should run at a given time and if any tasks were changed in any way or executed, stores the most recent state back to Kafka



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 111

def call
  changed = false

  schedule.each do |task|
    changed = true if task.changed?

    unless task.call?
      task.clear

      next
    end

    changed = true
    task.call
  end

  snapshot if changed
end

#incompatible?Boolean

Returns Is the current process schedule incompatible (older) than the one that we have in memory.

Returns:

  • (Boolean)

    Is the current process schedule incompatible (older) than the one that we have in memory



35
36
37
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 35

def incompatible?
  @incompatible
end

#replayObject

Once all previous data is accumulated runs the catchup process to establish current state of the recurring tasks schedule execution.

It includes applying any requested commands as well as synchronizing execution details for existing schedule and making sure all is loaded correctly.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 64

def replay
  # Ensure replaying happens only once
  return unless @replaying

  @replaying = false

  # When there is nothing to replay and synchronize, we can just save the state and
  # proceed
  if @catchup_commands.empty? && @catchup_schedule.nil?
    snapshot

    return
  end

  # If the schedule version we have in Kafka is higher than ours, we cannot proceed
  # This prevents us from applying older changes to a new schedule
  if @catchup_schedule[:schedule_version] > schedule.version
    @incompatible = true

    return
  end

  # Now we can synchronize the in-memory state based on the last state stored in Kafka
  schedule.each do |task|
    stored_task = @catchup_schedule[:tasks][task.id.to_sym]

    next unless stored_task

    stored_previous_time = stored_task[:previous_time]
    task.previous_time = stored_previous_time.zero? ? 0 : Time.at(stored_previous_time)

    stored_task[:enabled] ? task.enable : task.disable
  end

  @catchup_commands.each do |cmd|
    apply_command(cmd)
  end

  # We make sure to save in Kafka once more once everything is up to date
  snapshot

  @catchup_schedule = nil
  @catchup_commands = []
end

#replaying?Boolean

Returns are we in the replaying phase or not (false means, regular operations).

Returns:

  • (Boolean)

    are we in the replaying phase or not (false means, regular operations)



29
30
31
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 29

def replaying?
  @replaying
end

#update_state(schedule_hash) ⇒ Object

Updates the catchup state

Parameters:

  • schedule_hash (Hash)

    deserialized schedule hash hash



55
56
57
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 55

def update_state(schedule_hash)
  @catchup_schedule = schedule_hash
end