Class: Karafka::Pro::RecurringTasks::Consumer

Inherits:
BaseConsumer
  • Object
show all
Defined in:
lib/karafka/pro/recurring_tasks/consumer.rb

Overview

Consumer responsible for management of the recurring tasks and their execution There are some assumptions made here that always need to be satisfied: - we only run schedules that are of same or newer version - we always mark as consumed in such a way, that the first message received after assignment (if any) is a state

Instance Attribute Summary

Attributes inherited from BaseConsumer

#client, #coordinator, #id, #messages, #producer

Instance Method Summary collapse

Methods inherited from BaseConsumer

#on_after_consume, #on_before_consume, #on_before_schedule_consume, #on_before_schedule_eofed, #on_before_schedule_idle, #on_before_schedule_revoked, #on_before_schedule_shutdown, #on_consume, #on_eofed, #on_idle, #on_revoked, #on_shutdown

Constructor Details

#initialize(*args) ⇒ Consumer

Returns a new instance of Consumer.

Parameters:

  • args (Array)

    all arguments accepted by the consumer



24
25
26
27
# File 'lib/karafka/pro/recurring_tasks/consumer.rb', line 24

def initialize(*args)
  super
  @executor = Executor.new
end

Instance Method Details

#consumeObject



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/karafka/pro/recurring_tasks/consumer.rb', line 29

def consume
  # There is nothing we can do if we operate on a newer schedule. In such cases we should
  # just wait and re-raise error hoping someone will notice or that this will be
  # reassigned to a process with newer schedule
  raise(Errors::IncompatibleScheduleError) if @executor.incompatible?

  messages.each do |message|
    payload = message.payload
    type = payload[:type]

    case type
    when 'schedule'
      # If we're replaying data, we need to record the most recent stored state, so we
      # can use this data to fully initialize the scheduler
      @executor.update_state(payload) if @executor.replaying?

      # If this is first message we cannot mark it on the previous offset
      next if message.offset.zero?

      # We always mark as consumed in such a way, that when replaying, we start from a
      # schedule state message. This makes it easier to recover.
      mark_as_consumed Karafka::Messages::Seek.new(
        topic.name,
        partition,
        message.offset - 1
      )
    when 'command'
      @executor.apply_command(payload)

      next if @executor.replaying?

      # Execute on each incoming command to have nice latency but only after replaying
      # During replaying we should not execute because there may be more state changes
      # that collectively have a different outcome
      @executor.call
    else
      raise ::Karafka::Errors::UnsupportedCaseError, type
    end
  end

  eofed if eofed?
end

#eofedObject

Starts the final replay process if we reached eof during replaying



73
74
75
76
77
78
79
# File 'lib/karafka/pro/recurring_tasks/consumer.rb', line 73

def eofed
  # We only mark as replayed if we were replaying in the first place
  # If already replayed, nothing to do
  return unless @executor.replaying?

  @executor.replay
end

#tickObject

Runs the cron execution if all good.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/karafka/pro/recurring_tasks/consumer.rb', line 82

def tick
  # Do nothing until we fully recover the correct state
  return if @executor.replaying?

  # If the state is incompatible, we can only raise an error.
  # We do it here and in the `#consume` so the pause-retry kicks in basically reporting
  # on this issue once every minute. That way user should not miss this.
  # We use seek to move so we can achieve a pause of 60 seconds in between consecutive
  # errors instead of on each tick because it is much more frequent.
  if @executor.incompatible?
    if messages.empty?
      raise Errors::IncompatibleScheduleError
    else
      return seek(messages.last.offset - 1)
    end
  end

  # If all good and compatible we can execute the recurring tasks
  @executor.call
end