Class: Karafka::Pro::Processing::Coordinator
- Inherits:
-
Karafka::Processing::Coordinator
- Object
- Karafka::Processing::Coordinator
- Karafka::Pro::Processing::Coordinator
- Extended by:
- Forwardable
- Defined in:
- lib/karafka/pro/processing/coordinator.rb
Overview
Pro coordinator that provides extra orchestration methods useful for parallel processing within the same partition
Instance Attribute Summary collapse
-
#errors_tracker ⇒ Object
readonly
Returns the value of attribute errors_tracker.
-
#filter ⇒ Object
readonly
Returns the value of attribute filter.
-
#shared_mutex ⇒ Object
readonly
Returns the value of attribute shared_mutex.
-
#virtual_offset_manager ⇒ Object
readonly
Returns the value of attribute virtual_offset_manager.
Attributes inherited from Karafka::Processing::Coordinator
#eofed, #last_polled_at, #partition, #pause_tracker, #seek_offset, #topic
Instance Method Summary collapse
-
#active_within?(interval) ⇒ Boolean
Was this partition in activity within last
interval
milliseconds. -
#failure!(consumer, error) ⇒ Object
Sets the consumer failure status and additionally starts the collapse until.
-
#filtered? ⇒ Boolean
Did any of the filters apply any logic that would cause use to run the filtering flow.
-
#finished? ⇒ Boolean
Is the coordinated work finished or not.
-
#initialize(*args) ⇒ Coordinator
constructor
A new instance of Coordinator.
-
#on_enqueued ⇒ Object
Runs synchronized code once for a collective of virtual partitions prior to work being enqueued.
-
#on_finished ⇒ Object
Runs given code once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution.
-
#on_revoked ⇒ Object
Runs once after a partition is revoked.
-
#on_started ⇒ Object
Runs given code only once per all the coordinated jobs upon starting first of them.
-
#start(messages) ⇒ Object
Starts the coordination process.
Methods inherited from Karafka::Processing::Coordinator
#consumption, #decrement, #eofed?, #failure?, #increment, #manual_pause, #manual_pause?, #manual_seek, #manual_seek?, #marked?, #revoke, #revoked?, #success!, #success?, #synchronize
Constructor Details
#initialize(*args) ⇒ Coordinator
Returns a new instance of Coordinator.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 22 def initialize(*args) super @executed = [] @errors_tracker = errors_tracker_class.new(topic, partition) @flow_mutex = Mutex.new # Lock for user code synchronization # We do not want to mix coordinator lock with the user lock not to create cases where # user imposed lock would lock the internal operations of Karafka # This shared lock can be used by the end user as it is not used internally by the # framework and can be used for user-facing locking @shared_mutex = Mutex.new @collapser = Collapser.new @filter = Coordinators::FiltersApplier.new(self) return unless topic.virtual_partitions? @virtual_offset_manager = Coordinators::VirtualOffsetManager.new( topic.name, partition, topic.virtual_partitions. ) # We register our own "internal" filter to support filtering of messages that were marked # as consumed virtually @filter.filters << Filters::VirtualLimiter.new( @virtual_offset_manager, @collapser ) end |
Instance Attribute Details
#errors_tracker ⇒ Object (readonly)
Returns the value of attribute errors_tracker.
19 20 21 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 19 def errors_tracker @errors_tracker end |
#filter ⇒ Object (readonly)
Returns the value of attribute filter.
19 20 21 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 19 def filter @filter end |
#shared_mutex ⇒ Object (readonly)
Returns the value of attribute shared_mutex.
19 20 21 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 19 def shared_mutex @shared_mutex end |
#virtual_offset_manager ⇒ Object (readonly)
Returns the value of attribute virtual_offset_manager.
19 20 21 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 19 def virtual_offset_manager @virtual_offset_manager end |
Instance Method Details
#active_within?(interval) ⇒ Boolean
Will return true also if currently active
Returns was this partition in activity within last interval
milliseconds.
145 146 147 148 149 150 151 152 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 145 def active_within?(interval) # its always active if there's any job related to this coordinator that is still # enqueued or running return true if @running_jobs.values.any?(:positive?) # Otherwise we check last time any job of this coordinator was active @changed_at + interval > monotonic_now end |
#failure!(consumer, error) ⇒ Object
Sets the consumer failure status and additionally starts the collapse until
84 85 86 87 88 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 84 def failure!(consumer, error) super @errors_tracker << error collapse_until!(@last_message.offset + 1) end |
#filtered? ⇒ Boolean
Returns did any of the filters apply any logic that would cause use to run the filtering flow.
92 93 94 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 92 def filtered? @filter.applied? end |
#finished? ⇒ Boolean
Used only in the consume operation context
Returns is the coordinated work finished or not.
98 99 100 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 98 def finished? @running_jobs[:consume].zero? end |
#on_enqueued ⇒ Object
Runs synchronized code once for a collective of virtual partitions prior to work being enqueued
104 105 106 107 108 109 110 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 104 def on_enqueued @flow_mutex.synchronize do return unless executable?(:on_enqueued) yield(@last_message) end end |
#on_finished ⇒ Object
Runs given code once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution
124 125 126 127 128 129 130 131 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 124 def on_finished @flow_mutex.synchronize do return unless finished? return unless executable?(:on_finished) yield(@last_message) end end |
#on_revoked ⇒ Object
Runs once after a partition is revoked
134 135 136 137 138 139 140 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 134 def on_revoked @flow_mutex.synchronize do return unless executable?(:on_revoked) yield(@last_message) end end |
#on_started ⇒ Object
Runs given code only once per all the coordinated jobs upon starting first of them
113 114 115 116 117 118 119 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 113 def on_started @flow_mutex.synchronize do return unless executable?(:on_started) yield(@last_message) end end |
#start(messages) ⇒ Object
Starts the coordination process
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 56 def start() super @collapser.refresh!(.first.offset) @filter.apply!() # Do not clear coordinator errors storage when we are retrying, so we can reference the # errors that have happened during recovery. This can be useful for implementing custom # flows. There can be more errors than one when running with virtual partitions so we # need to make sure we collect them all. Under collapse when we reference a given # consumer we should be able to get all the errors and not just first/last. # # @note We use zero as the attempt mark because we are not "yet" in the attempt 1 @errors_tracker.clear if attempt.zero? @executed.clear # We keep the old processed offsets until the collapsing is done and regular processing # with virtualization is restored @virtual_offset_manager.clear if topic.virtual_partitions? && !collapsed? @last_message = .last end |