Class: Karafka::Pro::RecurringTasks::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/recurring_tasks/task.rb

Overview

Represents a single recurring task that can be executed when the time comes. Tasks should be lightweight. Anything heavy should be executed by scheduling appropriate jobs here.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id:, cron:, previous_time: 0, enabled: true, &block) ⇒ Task

Returns a new instance of Task.

Parameters:

  • id (String)

    unique id. If re-used between versions, will replace older occurrence.

  • cron (String)

    cron expression matching this task expected execution times.

  • previous_time (Integer, Time) (defaults to: 0)

    previous time this task was executed. 0 if never.

  • enabled (Boolean) (defaults to: true)

    should this task be enabled. Users may disable given task temporarily, this is why we need to know that.

  • block (Proc)

    code to execute.

[View source]

32
33
34
35
36
37
38
39
40
41
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 32

def initialize(id:, cron:, previous_time: 0, enabled: true, &block)
  @id = id
  @cron = ::Fugit::Cron.do_parse(cron)
  @previous_time = previous_time
  @start_time = Time.now
  @executable = block
  @enabled = enabled
  @trigger = false
  @changed = false
end

Instance Attribute Details

#cronFugit::Cron (readonly)

Returns cron from parsing the raw cron expression.

Returns:

  • (Fugit::Cron)

    cron from parsing the raw cron expression


21
22
23
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 21

def cron
  @cron
end

#idString (readonly)

Returns this task id.

Returns:

  • (String)

    this task id


18
19
20
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 18

def id
  @id
end

#previous_timeObject

Allows for update of previous time when restoring the materialized state


24
25
26
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 24

def previous_time
  @previous_time
end

Instance Method Details

#callObject

Executes the given task and publishes appropriate notification bus events.

[View source]

87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 87

def call
  monitor.instrument(
    'recurring_tasks.task.executed',
    task: self
  ) do
    # We check for presence of the `@executable` because user can define cron schedule
    # without the code block
    return unless @executable

    execute
  end
rescue StandardError => e
  monitor.instrument(
    'error.occurred',
    caller: self,
    error: e,
    task: self,
    type: 'recurring_tasks.task.execute.error'
  )
ensure
  @trigger = false
  @previous_time = Time.now
end

#call?Boolean

Returns should we execute this task at this moment in time.

Returns:

  • (Boolean)

    should we execute this task at this moment in time

[View source]

77
78
79
80
81
82
83
84
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 77

def call?
  return true if @trigger
  return false unless enabled?

  # Ensure the job is only due if current_time is strictly after the next_time
  # Please note that we can only compare eorbi against time and not the other way around
  next_time <= Time.now
end

#changed?Boolean

Returns true if anything in the task has changed and we should persist it.

Returns:

  • (Boolean)

    true if anything in the task has changed and we should persist it

[View source]

44
45
46
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 44

def changed?
  @changed
end

#clearObject

Removes the changes indicator flag

[View source]

118
119
120
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 118

def clear
  @changed = false
end

#disableObject

Disables this task execution indefinitely

[View source]

49
50
51
52
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 49

def disable
  touch
  @enabled = false
end

#enableObject

Enables back this task

[View source]

55
56
57
58
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 55

def enable
  touch
  @enabled = true
end

#enabled?Boolean

Returns is this an executable task.

Returns:

  • (Boolean)

    is this an executable task

[View source]

61
62
63
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 61

def enabled?
  @enabled
end

#executeObject

Runs the executable block without any instrumentation or error handling. Useful for debugging and testing

[View source]

113
114
115
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 113

def execute
  @executable.call
end

#next_timeEtOrbi::EoTime

Returns next execution time.

Returns:

  • (EtOrbi::EoTime)

    next execution time

[View source]

72
73
74
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 72

def next_time
  @cron.next_time(@previous_time.to_i.zero? ? @start_time : @previous_time)
end

#to_hHash

Returns hash version of the task. Used for contract validation.

Returns:

  • (Hash)

    hash version of the task. Used for contract validation.

[View source]

123
124
125
126
127
128
129
130
131
132
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 123

def to_h
  {
    id: id,
    cron: @cron.original,
    previous_time: previous_time,
    next_time: next_time,
    changed: changed?,
    enabled: enabled?
  }
end

#triggerObject

Triggers the execution of this task at the earliest opportunity

[View source]

66
67
68
69
# File 'lib/karafka/pro/recurring_tasks/task.rb', line 66

def trigger
  touch
  @trigger = true
end