Class: Karafka::Processing::Coordinator
- Inherits:
-
Object
- Object
- Karafka::Processing::Coordinator
- Extended by:
- Forwardable
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/processing/coordinator.rb
Overview
This coordinator needs to be thread safe. Some operations are performed only in the listener thread, but we go with thread-safe by default for all not to worry about potential future mistakes.
Basic coordinator that allows us to provide coordination objects into consumers.
This is a wrapping layer to simplify management of work to be handled around consumption.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#eofed ⇒ Object
This can be set directly on the listener because it can be triggered on first run without any messages.
-
#partition ⇒ Object
readonly
Returns the value of attribute partition.
-
#pause_tracker ⇒ Object
readonly
Returns the value of attribute pause_tracker.
-
#seek_offset ⇒ Object
Returns the value of attribute seek_offset.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#consumption(consumer) ⇒ Karafka::Processing::Result
Result object which we can use to indicate consumption processing state.
-
#decrement(job_type) ⇒ Object
Decrements number of jobs we handle at the moment.
-
#eofed? ⇒ Boolean
Did we reach end of partition when polling data.
-
#failure!(consumer, error) ⇒ Object
Mark given consumption on consumer as failed.
-
#failure? ⇒ Boolean
True if any of work we were running failed.
-
#increment(job_type) ⇒ Object
Increases number of jobs that we handle with this coordinator.
-
#initialize(topic, partition, pause_tracker) ⇒ Coordinator
constructor
A new instance of Coordinator.
-
#manual_pause ⇒ Object
Store in the coordinator info, that this pause was done manually by the end user and not by the system itself.
-
#manual_pause? ⇒ Boolean
Are we in a pause that was initiated by the user.
-
#manual_seek ⇒ Object
Marks seek as manual for coordination purposes.
-
#manual_seek? ⇒ Boolean
Did a user invoke seek in the current operations scope.
-
#marked? ⇒ Boolean
Was the new seek offset assigned at least once.
-
#revoke ⇒ Object
Marks given coordinator for processing group as revoked.
-
#revoked? ⇒ Boolean
Is the partition we are processing revoked or not.
-
#start(messages) ⇒ Object
Starts the coordinator for given consumption jobs.
-
#success!(consumer) ⇒ Object
Mark given consumption on consumer as successful.
-
#success? ⇒ Boolean
Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.
-
#synchronize(&block) ⇒ Object
Allows to run synchronized (locked) code that can operate only from a given thread.
Constructor Details
#initialize(topic, partition, pause_tracker) ⇒ Coordinator
Returns a new instance of Coordinator.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/karafka/processing/coordinator.rb', line 27 def initialize(topic, partition, pause_tracker) @topic = topic @partition = partition @pause_tracker = pause_tracker @revoked = false @consumptions = {} @running_jobs = Hash.new { |h, k| h[k] = 0 } @manual_pause = false @manual_seek = false @mutex = Mutex.new @marked = false @failure = false @eofed = false @changed_at = monotonic_now end |
Instance Attribute Details
#eofed ⇒ Object
This can be set directly on the listener because it can be triggered on first run without any messages
20 21 22 |
# File 'lib/karafka/processing/coordinator.rb', line 20 def eofed @eofed end |
#partition ⇒ Object (readonly)
Returns the value of attribute partition.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def partition @partition end |
#pause_tracker ⇒ Object (readonly)
Returns the value of attribute pause_tracker.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def pause_tracker @pause_tracker end |
#seek_offset ⇒ Object
Returns the value of attribute seek_offset.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def seek_offset @seek_offset end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def topic @topic end |
Instance Method Details
#consumption(consumer) ⇒ Karafka::Processing::Result
Returns result object which we can use to indicate consumption processing state.
191 192 193 |
# File 'lib/karafka/processing/coordinator.rb', line 191 def consumption(consumer) @consumptions[consumer] ||= Processing::Result.new end |
#decrement(job_type) ⇒ Object
Decrements number of jobs we handle at the moment
90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/karafka/processing/coordinator.rb', line 90 def decrement(job_type) synchronize do @running_jobs[job_type] -= 1 @changed_at = monotonic_now return @running_jobs[job_type] unless @running_jobs[job_type].negative? # This should never happen. If it does, something is heavily out of sync. Please reach # out to us if you encounter this raise Karafka::Errors::InvalidCoordinatorStateError, 'Was zero before decrementation' end end |
#eofed? ⇒ Boolean
Returns did we reach end of partition when polling data.
155 156 157 |
# File 'lib/karafka/processing/coordinator.rb', line 155 def eofed? @eofed end |
#failure!(consumer, error) ⇒ Object
Mark given consumption on consumer as failed
124 125 126 127 128 129 |
# File 'lib/karafka/processing/coordinator.rb', line 124 def failure!(consumer, error) synchronize do @failure = true consumption(consumer).failure!(error) end end |
#failure? ⇒ Boolean
Returns true if any of work we were running failed.
132 133 134 |
# File 'lib/karafka/processing/coordinator.rb', line 132 def failure? @failure end |
#increment(job_type) ⇒ Object
Increases number of jobs that we handle with this coordinator
81 82 83 84 85 86 |
# File 'lib/karafka/processing/coordinator.rb', line 81 def increment(job_type) synchronize do @running_jobs[job_type] += 1 @changed_at = monotonic_now end end |
#manual_pause ⇒ Object
Store in the coordinator info, that this pause was done manually by the end user and not by the system itself
169 170 171 |
# File 'lib/karafka/processing/coordinator.rb', line 169 def manual_pause @manual_pause = true end |
#manual_pause? ⇒ Boolean
Returns are we in a pause that was initiated by the user.
174 175 176 |
# File 'lib/karafka/processing/coordinator.rb', line 174 def manual_pause? paused? && @manual_pause end |
#manual_seek ⇒ Object
Marks seek as manual for coordination purposes
179 180 181 |
# File 'lib/karafka/processing/coordinator.rb', line 179 def manual_seek @manual_seek = true end |
#manual_seek? ⇒ Boolean
Returns did a user invoke seek in the current operations scope.
184 185 186 |
# File 'lib/karafka/processing/coordinator.rb', line 184 def manual_seek? @manual_seek end |
#marked? ⇒ Boolean
Returns was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.
163 164 165 |
# File 'lib/karafka/processing/coordinator.rb', line 163 def marked? @marked end |
#revoke ⇒ Object
Marks given coordinator for processing group as revoked
This is invoked in two places: - from the main listener loop when we detect revoked partitions - from the consumer in case checkpointing fails
This means, we can end up having consumer being aware that it was revoked prior to the listener loop dispatching the revocation job. It is ok, as effectively nothing will be processed until revocation jobs are done.
145 146 147 |
# File 'lib/karafka/processing/coordinator.rb', line 145 def revoke synchronize { @revoked = true } end |
#revoked? ⇒ Boolean
Returns is the partition we are processing revoked or not.
150 151 152 |
# File 'lib/karafka/processing/coordinator.rb', line 150 def revoked? @revoked end |
#start(messages) ⇒ Object
Starts the coordinator for given consumption jobs
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/karafka/processing/coordinator.rb', line 46 def start() @failure = false @running_jobs[:consume] = 0 # We need to clear the consumption results hash here, otherwise we could end up storing # consumption results of consumer instances we no longer control @consumptions.clear # When starting to run, no pause is expected and no manual pause as well @manual_pause = false # No user invoked seeks on a new run @manual_seek = false # We set it on the first encounter and never again, because then the offset setting # should be up to the consumers logic (our or the end user) # Seek offset needs to be always initialized as for case where manual offset management # is turned on, we need to have reference to the first offset even in case of running # multiple batches without marking any messages as consumed. Rollback needs to happen to # the last place we know of or the last message + 1 that was marked # # It is however worth keeping in mind, that this may need to be used with `#marked?` to # make sure that the first offset is an offset that has been marked. @seek_offset ||= .first.offset end |
#success!(consumer) ⇒ Object
Mark given consumption on consumer as successful
115 116 117 118 119 |
# File 'lib/karafka/processing/coordinator.rb', line 115 def success!(consumer) synchronize do consumption(consumer).success! end end |
#success? ⇒ Boolean
This is only used for consume synchronization
Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.
107 108 109 110 111 |
# File 'lib/karafka/processing/coordinator.rb', line 107 def success? synchronize do @running_jobs[:consume].zero? && @consumptions.values.all?(&:success?) end end |
#synchronize(&block) ⇒ Object
We check if mutex is not owned already by the current thread so we won’t end up with a deadlock in case user runs coordinated code from inside of his own lock
This is internal and should not be used to synchronize user-facing code. Otherwise user indirectly could cause deadlocks or prolonged locks by running his logic. This can and should however be used for multi-thread strategy applications and other internal operations locks.
Allows to run synchronized (locked) code that can operate only from a given thread
206 207 208 209 210 211 212 |
# File 'lib/karafka/processing/coordinator.rb', line 206 def synchronize(&block) if @mutex.owned? yield else @mutex.synchronize(&block) end end |