Class: Karafka::Pro::Processing::Coordinator

Inherits:
Karafka::Processing::Coordinator show all
Extended by:
Forwardable
Defined in:
lib/karafka/pro/processing/coordinator.rb

Overview

Pro coordinator that provides extra orchestration methods useful for parallel processing within the same partition

Instance Attribute Summary collapse

Attributes inherited from Karafka::Processing::Coordinator

#eofed, #partition, #pause_tracker, #seek_offset, #topic

Instance Method Summary collapse

Methods inherited from Karafka::Processing::Coordinator

#consumption, #decrement, #eofed?, #failure?, #increment, #manual_pause, #manual_pause?, #manual_seek, #manual_seek?, #marked?, #revoke, #revoked?, #success!, #success?, #synchronize

Constructor Details

#initialize(*args) ⇒ Coordinator

Returns a new instance of Coordinator.

Parameters:

  • args (Object)

    anything the base coordinator accepts



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/karafka/pro/processing/coordinator.rb', line 27

def initialize(*args)
  super

  @executed = []
  @errors_tracker = Coordinators::ErrorsTracker.new
  @flow_mutex = Mutex.new
  # Lock for user code synchronization
  # We do not want to mix coordinator lock with the user lock not to create cases where
  # user imposed lock would lock the internal operations of Karafka
  # This shared lock can be used by the end user as it is not used internally by the
  # framework and can be used for user-facing locking
  @shared_mutex = Mutex.new
  @collapser = Collapser.new
  @filter = Coordinators::FiltersApplier.new(self)

  return unless topic.virtual_partitions?

  @virtual_offset_manager = Coordinators::VirtualOffsetManager.new(
    topic.name,
    partition,
    topic.virtual_partitions.
  )

  # We register our own "internal" filter to support filtering of messages that were marked
  # as consumed virtually
  @filter.filters << Filters::VirtualLimiter.new(
    @virtual_offset_manager,
    @collapser
  )
end

Instance Attribute Details

#errors_trackerObject (readonly)

Returns the value of attribute errors_tracker.



24
25
26
# File 'lib/karafka/pro/processing/coordinator.rb', line 24

def errors_tracker
  @errors_tracker
end

#filterObject (readonly)

Returns the value of attribute filter.



24
25
26
# File 'lib/karafka/pro/processing/coordinator.rb', line 24

def filter
  @filter
end

#shared_mutexObject (readonly)

Returns the value of attribute shared_mutex.



24
25
26
# File 'lib/karafka/pro/processing/coordinator.rb', line 24

def shared_mutex
  @shared_mutex
end

#virtual_offset_managerObject (readonly)

Returns the value of attribute virtual_offset_manager.



24
25
26
# File 'lib/karafka/pro/processing/coordinator.rb', line 24

def virtual_offset_manager
  @virtual_offset_manager
end

Instance Method Details

#active_within?(interval) ⇒ Boolean

Note:

Will return true also if currently active

Returns was this partition in activity within last interval milliseconds.

Parameters:

  • interval (Integer)

    milliseconds of activity

Returns:

  • (Boolean)

    was this partition in activity within last interval milliseconds



150
151
152
153
154
155
156
157
# File 'lib/karafka/pro/processing/coordinator.rb', line 150

def active_within?(interval)
  # its always active if there's any job related to this coordinator that is still
  # enqueued or running
  return true if @running_jobs.values.any?(:positive?)

  # Otherwise we check last time any job of this coordinator was active
  @changed_at + interval > monotonic_now
end

#failure!(consumer, error) ⇒ Object

Sets the consumer failure status and additionally starts the collapse until

Parameters:

  • consumer (Karafka::BaseConsumer)

    consumer that failed

  • error (StandardError)

    error from the failure



89
90
91
92
93
# File 'lib/karafka/pro/processing/coordinator.rb', line 89

def failure!(consumer, error)
  super
  @errors_tracker << error
  collapse_until!(@last_message.offset + 1)
end

#filtered?Boolean

Returns did any of the filters apply any logic that would cause use to run the filtering flow.

Returns:

  • (Boolean)

    did any of the filters apply any logic that would cause use to run the filtering flow



97
98
99
# File 'lib/karafka/pro/processing/coordinator.rb', line 97

def filtered?
  @filter.applied?
end

#finished?Boolean

Note:

Used only in the consume operation context

Returns is the coordinated work finished or not.

Returns:

  • (Boolean)

    is the coordinated work finished or not



103
104
105
# File 'lib/karafka/pro/processing/coordinator.rb', line 103

def finished?
  @running_jobs[:consume].zero?
end

#on_enqueuedObject

Runs synchronized code once for a collective of virtual partitions prior to work being enqueued



109
110
111
112
113
114
115
# File 'lib/karafka/pro/processing/coordinator.rb', line 109

def on_enqueued
  @flow_mutex.synchronize do
    return unless executable?(:on_enqueued)

    yield(@last_message)
  end
end

#on_finishedObject

Runs given code once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution



129
130
131
132
133
134
135
136
# File 'lib/karafka/pro/processing/coordinator.rb', line 129

def on_finished
  @flow_mutex.synchronize do
    return unless finished?
    return unless executable?(:on_finished)

    yield(@last_message)
  end
end

#on_revokedObject

Runs once after a partition is revoked



139
140
141
142
143
144
145
# File 'lib/karafka/pro/processing/coordinator.rb', line 139

def on_revoked
  @flow_mutex.synchronize do
    return unless executable?(:on_revoked)

    yield(@last_message)
  end
end

#on_startedObject

Runs given code only once per all the coordinated jobs upon starting first of them



118
119
120
121
122
123
124
# File 'lib/karafka/pro/processing/coordinator.rb', line 118

def on_started
  @flow_mutex.synchronize do
    return unless executable?(:on_started)

    yield(@last_message)
  end
end

#start(messages) ⇒ Object

Starts the coordination process

Parameters:



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/karafka/pro/processing/coordinator.rb', line 61

def start(messages)
  super

  @collapser.refresh!(messages.first.offset)

  @filter.apply!(messages)

  # Do not clear coordinator errors storage when we are retrying, so we can reference the
  # errors that have happened during recovery. This can be useful for implementing custom
  # flows. There can be more errors than one when running with virtual partitions so we
  # need to make sure we collect them all. Under collapse when we reference a given
  # consumer we should be able to get all the errors and not just first/last.
  #
  # @note We use zero as the attempt mark because we are not "yet" in the attempt 1
  @errors_tracker.clear if attempt.zero?
  @executed.clear

  # We keep the old processed offsets until the collapsing is done and regular processing
  # with virtualization is restored
  @virtual_offset_manager.clear if topic.virtual_partitions? && !collapsed?

  @last_message = messages.last
end