Class: Karafka::Pro::Processing::Coordinator

Inherits:
Karafka::Processing::Coordinator show all
Extended by:
Forwardable
Defined in:
lib/karafka/pro/processing/coordinator.rb

Overview

Pro coordinator that provides extra orchestration methods useful for parallel processing within the same partition

Instance Attribute Summary collapse

Attributes inherited from Karafka::Processing::Coordinator

#eofed, #last_polled_at, #partition, #pause_tracker, #seek_offset, #topic

Instance Method Summary collapse

Methods inherited from Karafka::Processing::Coordinator

#consumption, #decrement, #eofed?, #failure?, #increment, #manual_pause, #manual_pause?, #manual_seek, #manual_seek?, #marked?, #revoke, #revoked?, #success!, #success?, #synchronize

Constructor Details

#initialize(*args) ⇒ Coordinator

Returns a new instance of Coordinator.

Parameters:

  • args (Object)

    anything the base coordinator accepts



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/karafka/pro/processing/coordinator.rb', line 19

def initialize(*args)
  super

  @executed = []
  @errors_tracker = Coordinators::ErrorsTracker.new
  @flow_mutex = Mutex.new
  # Lock for user code synchronization
  # We do not want to mix coordinator lock with the user lock not to create cases where
  # user imposed lock would lock the internal operations of Karafka
  # This shared lock can be used by the end user as it is not used internally by the
  # framework and can be used for user-facing locking
  @shared_mutex = Mutex.new
  @collapser = Collapser.new
  @filter = Coordinators::FiltersApplier.new(self)

  return unless topic.virtual_partitions?

  @virtual_offset_manager = Coordinators::VirtualOffsetManager.new(
    topic.name,
    partition,
    topic.virtual_partitions.
  )

  # We register our own "internal" filter to support filtering of messages that were marked
  # as consumed virtually
  @filter.filters << Filters::VirtualLimiter.new(
    @virtual_offset_manager,
    @collapser
  )
end

Instance Attribute Details

#errors_trackerObject (readonly)

Returns the value of attribute errors_tracker.



16
17
18
# File 'lib/karafka/pro/processing/coordinator.rb', line 16

def errors_tracker
  @errors_tracker
end

#filterObject (readonly)

Returns the value of attribute filter.



16
17
18
# File 'lib/karafka/pro/processing/coordinator.rb', line 16

def filter
  @filter
end

#shared_mutexObject (readonly)

Returns the value of attribute shared_mutex.



16
17
18
# File 'lib/karafka/pro/processing/coordinator.rb', line 16

def shared_mutex
  @shared_mutex
end

#virtual_offset_managerObject (readonly)

Returns the value of attribute virtual_offset_manager.



16
17
18
# File 'lib/karafka/pro/processing/coordinator.rb', line 16

def virtual_offset_manager
  @virtual_offset_manager
end

Instance Method Details

#active_within?(interval) ⇒ Boolean

Note:

Will return true also if currently active

Returns was this partition in activity within last interval milliseconds.

Parameters:

  • interval (Integer)

    milliseconds of activity

Returns:

  • (Boolean)

    was this partition in activity within last interval milliseconds



142
143
144
145
146
147
148
149
# File 'lib/karafka/pro/processing/coordinator.rb', line 142

def active_within?(interval)
  # its always active if there's any job related to this coordinator that is still
  # enqueued or running
  return true if @running_jobs.values.any?(:positive?)

  # Otherwise we check last time any job of this coordinator was active
  @changed_at + interval > monotonic_now
end

#failure!(consumer, error) ⇒ Object

Sets the consumer failure status and additionally starts the collapse until

Parameters:

  • consumer (Karafka::BaseConsumer)

    consumer that failed

  • error (StandardError)

    error from the failure



81
82
83
84
85
# File 'lib/karafka/pro/processing/coordinator.rb', line 81

def failure!(consumer, error)
  super
  @errors_tracker << error
  collapse_until!(@last_message.offset + 1)
end

#filtered?Boolean

Returns did any of the filters apply any logic that would cause use to run the filtering flow.

Returns:

  • (Boolean)

    did any of the filters apply any logic that would cause use to run the filtering flow



89
90
91
# File 'lib/karafka/pro/processing/coordinator.rb', line 89

def filtered?
  @filter.applied?
end

#finished?Boolean

Note:

Used only in the consume operation context

Returns is the coordinated work finished or not.

Returns:

  • (Boolean)

    is the coordinated work finished or not



95
96
97
# File 'lib/karafka/pro/processing/coordinator.rb', line 95

def finished?
  @running_jobs[:consume].zero?
end

#on_enqueuedObject

Runs synchronized code once for a collective of virtual partitions prior to work being enqueued



101
102
103
104
105
106
107
# File 'lib/karafka/pro/processing/coordinator.rb', line 101

def on_enqueued
  @flow_mutex.synchronize do
    return unless executable?(:on_enqueued)

    yield(@last_message)
  end
end

#on_finishedObject

Runs given code once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution



121
122
123
124
125
126
127
128
# File 'lib/karafka/pro/processing/coordinator.rb', line 121

def on_finished
  @flow_mutex.synchronize do
    return unless finished?
    return unless executable?(:on_finished)

    yield(@last_message)
  end
end

#on_revokedObject

Runs once after a partition is revoked



131
132
133
134
135
136
137
# File 'lib/karafka/pro/processing/coordinator.rb', line 131

def on_revoked
  @flow_mutex.synchronize do
    return unless executable?(:on_revoked)

    yield(@last_message)
  end
end

#on_startedObject

Runs given code only once per all the coordinated jobs upon starting first of them



110
111
112
113
114
115
116
# File 'lib/karafka/pro/processing/coordinator.rb', line 110

def on_started
  @flow_mutex.synchronize do
    return unless executable?(:on_started)

    yield(@last_message)
  end
end

#start(messages) ⇒ Object

Starts the coordination process

Parameters:



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/karafka/pro/processing/coordinator.rb', line 53

def start(messages)
  super

  @collapser.refresh!(messages.first.offset)

  @filter.apply!(messages)

  # Do not clear coordinator errors storage when we are retrying, so we can reference the
  # errors that have happened during recovery. This can be useful for implementing custom
  # flows. There can be more errors than one when running with virtual partitions so we
  # need to make sure we collect them all. Under collapse when we reference a given
  # consumer we should be able to get all the errors and not just first/last.
  #
  # @note We use zero as the attempt mark because we are not "yet" in the attempt 1
  @errors_tracker.clear if attempt.zero?
  @executed.clear

  # We keep the old processed offsets until the collapsing is done and regular processing
  # with virtualization is restored
  @virtual_offset_manager.clear if topic.virtual_partitions? && !collapsed?

  @last_message = messages.last
end