Module: Karafka::Pro::Processing::Strategies::Default
- Includes:
- Base, Karafka::Processing::Strategies::Default
- Included in:
- Aj::DlqLrjMom, Aj::DlqMomVp, Aj::LrjMomVp, Aj::MomVp, Karafka::Pro::Processing::Strategies::Dlq::Default, Ftr::Default, Lrj::Default, Lrj::Mom, Mom::Default, Vp::Default
- Defined in:
- lib/karafka/pro/processing/strategies/default.rb
Overview
No features enabled. No manual offset management No long running jobs No virtual partitions Nothing. Just standard, automatic flow
Constant Summary collapse
- FEATURES =
Apply strategy for a non-feature based flow
%i[].freeze
Instance Method Summary collapse
-
#handle_after_consume ⇒ Object
Standard flow without any features.
-
#handle_before_consume ⇒ Object
Increment number of attempts per one “full” job.
-
#handle_before_schedule_consume ⇒ Object
No actions needed for the standard flow here.
-
#handle_before_schedule_tick ⇒ Object
No action needed for the tick standard flow.
-
#handle_consume ⇒ Object
Run the user consumption code.
-
#handle_revoked ⇒ Object
Standard flow for revocation.
-
#handle_tick ⇒ Object
Runs the consumer
#tick
method with reporting. -
#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean
Marks message as consumed in an async way.
-
#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean
Marks message as consumed in a sync way.
-
#mark_in_memory(message) ⇒ Boolean
Marks the current state only in memory as the offset marking has already happened using the producer transaction.
-
#mark_in_transaction(message, offset_metadata, async) ⇒ Object
Stores the next offset for processing inside of the transaction and stores it in a local accumulator for post-transaction status update.
-
#mark_with_transaction(message, offset_metadata, async) ⇒ Boolean
False if marking failed otherwise true.
-
#store_offset_metadata(offset_metadata) ⇒ Object
Allows to set offset metadata that will be used with the upcoming marking as consumed as long as a different offset metadata was not used.
-
#transaction(active_producer = producer) { ... } ⇒ Object
Starts producer transaction, saves the transaction context for transactional marking and runs user code in this context.
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_eofed, #handle_idle, #handle_initialized, #handle_shutdown, #handle_wrap
Methods included from Karafka::Processing::Strategies::Base
#handle_idle, #handle_shutdown
Instance Method Details
#handle_after_consume ⇒ Object
Standard flow without any features
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 309 def handle_after_consume coordinator.on_finished do || return if revoked? if coordinator.success? coordinator.pause_tracker.reset # Do not mark last message if pause happened. This prevents a scenario where pause # is overridden upon rebalance by marking return if coordinator.manual_pause? mark_as_consumed() else retry_after_pause end end end |
#handle_before_consume ⇒ Object
Increment number of attempts per one “full” job. For all VP on a single topic partition this also should run once.
276 277 278 279 280 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 276 def handle_before_consume coordinator.on_started do coordinator.pause_tracker.increment end end |
#handle_before_schedule_consume ⇒ Object
No actions needed for the standard flow here
268 269 270 271 272 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 268 def handle_before_schedule_consume monitor.instrument('consumer.before_schedule_consume', caller: self) nil end |
#handle_before_schedule_tick ⇒ Object
No action needed for the tick standard flow
344 345 346 347 348 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 344 def handle_before_schedule_tick monitor.instrument('consumer.before_schedule_tick', caller: self) nil end |
#handle_consume ⇒ Object
Run the user consumption code
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 283 def handle_consume # We should not run the work at all on a partition that was revoked # This can happen primarily when an LRJ job gets to the internal worker queue and # this partition is revoked prior processing. unless revoked? monitor.instrument('consumer.consume', caller: self) monitor.instrument('consumer.consumed', caller: self) do consume end end # Mark job as successful coordinator.success!(self) rescue StandardError => e # If failed, mark as failed coordinator.failure!(self, e) # Re-raise so reported in the consumer raise e ensure # We need to decrease number of jobs that this coordinator coordinates as it has # finished coordinator.decrement(:consume) end |
#handle_revoked ⇒ Object
Standard flow for revocation
328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 328 def handle_revoked coordinator.on_revoked do resume coordinator.revoke end monitor.instrument('consumer.revoke', caller: self) monitor.instrument('consumer.revoked', caller: self) do revoked end ensure coordinator.decrement(:revoked) end |
#handle_tick ⇒ Object
Runs the consumer #tick
method with reporting
351 352 353 354 355 356 357 358 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 351 def handle_tick monitor.instrument('consumer.tick', caller: self) monitor.instrument('consumer.ticked', caller: self) do tick end ensure coordinator.decrement(:periodic) end |
#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean
We keep track of this offset in case we would mark as consumed and got error when processing another message. In case like this we do not pause on the message we’ve already processed but rather at the next one. This applies to both sync and async versions of this method.
Marks message as consumed in an async way.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 48 def mark_as_consumed(, = @_current_offset_metadata) # If we are inside a transaction than we can just mark as consumed within it if @_in_transaction mark_in_transaction(, , true) elsif @_in_transaction_marked mark_in_memory() else # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if seek_offset > .offset return false if revoked? # If we are not inside a transaction but this is a transactional topic, we mark with # artificially created transaction stored = if producer.transactional? mark_with_transaction(, , true) else client.mark_as_consumed(, ) end return revoked? unless stored self.seek_offset = .offset + 1 end true ensure @_current_offset_metadata = nil end |
#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean
Marks message as consumed in a sync way.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 86 def mark_as_consumed!(, = @_current_offset_metadata) if @_in_transaction mark_in_transaction(, , false) elsif @_in_transaction_marked mark_in_memory() else # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if seek_offset > .offset return false if revoked? # If we are not inside a transaction but this is a transactional topic, we mark with # artificially created transaction stored = if producer.transactional? mark_with_transaction(, , false) else client.mark_as_consumed!(, ) end return revoked? unless stored self.seek_offset = .offset + 1 end true ensure @_current_offset_metadata = nil end |
#mark_in_memory(message) ⇒ Boolean
Marks the current state only in memory as the offset marking has already happened using the producer transaction
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 251 def mark_in_memory() # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if seek_offset > .offset return false if revoked? # If we have already marked this successfully in a transaction that was running # we should not mark it again with the client offset delegation but instead we should # just align the in-memory state self.seek_offset = .offset + 1 true end |
#mark_in_transaction(message, offset_metadata, async) ⇒ Object
Stores the next offset for processing inside of the transaction and stores it in a local accumulator for post-transaction status update
206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 206 def mark_in_transaction(, , async) raise Errors::TransactionRequiredError unless @_in_transaction raise Errors::AssignmentLostError if revoked? producer.transaction_mark_as_consumed( client, , ) @_in_transaction_marked = true @_transaction_marked ||= [] @_transaction_marked << [, , async] end |
#mark_with_transaction(message, offset_metadata, async) ⇒ Boolean
Returns false if marking failed otherwise true.
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 227 def mark_with_transaction(, , async) # This flag is used by VPs to differentiate between user initiated transactions and # post-execution system transactions. @_transaction_internal = true transaction do mark_in_transaction(, , async) end true # We handle both cases here because this is a private API for internal usage and we want # the post-user code execution marking with transactional producer to result in a # boolean state of marking for further framework flow. This is a normalization to make it # behave the same way as it would behave with a non-transactional one rescue ::Rdkafka::RdkafkaError, Errors::AssignmentLostError false ensure @_transaction_internal = false end |
#store_offset_metadata(offset_metadata) ⇒ Object
Please be aware, that offset metadata set this way will be passed to any marking as consumed even if it was not user initiated. For example in the DLQ flow.
Allows to set offset metadata that will be used with the upcoming marking as consumed as long as a different offset metadata was not used. After it was used either via #mark_as_consumed
or #mark_as_consumed!
it will be set back to nil
. It is done that way to provide the end user with ability to influence metadata on the non-user initiated markings in complex flows.
33 34 35 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 33 def () @_current_offset_metadata = end |
#transaction(active_producer = producer) { ... } ⇒ Object
Please note, that if you provide the producer, it will reassign the producer of the consumer for the transaction time. This means, that in case you would even accidentally refer to Consumer#producer
from other threads, it will contain the reassigned producer and not the initially used/assigned producer. It is done that way, so the message producing aliases operate from within transactions and since the producer in transaction is locked, it will prevent other threads from using it.
Starts producer transaction, saves the transaction context for transactional marking and runs user code in this context
Transactions on a consumer level differ from those initiated by the producer as they allow to mark offsets inside of the transaction. If the transaction is initialized only from the consumer, the offset will be stored in a regular fashion.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 137 def transaction(active_producer = producer) default_producer = nil transaction_started = nil monitor.instrument('consumer.consuming.transaction', caller: self) do default_producer = producer self.producer = active_producer transaction_started = false # Prevent from nested transactions. It would not make any sense raise Errors::TransactionAlreadyInitializedError if @_in_transaction transaction_started = true @_transaction_marked = [] @_in_transaction = true @_in_transaction_marked = false producer.transaction do yield # Ensure this transaction is rolled back if we have lost the ownership of this # transaction. We do it only for transactions that contain offset management as for # producer only, this is not relevant. raise Errors::AssignmentLostError if @_in_transaction_marked && revoked? end @_in_transaction = false # This offset is already stored in transaction but we set it here anyhow because we # want to make sure our internal in-memory state is aligned with the transaction # # @note We never need to use the blocking `#mark_as_consumed!` here because the # offset anyhow was already stored during the transaction # # @note Since the offset could have been already stored in Kafka (could have because # you can have transactions without marking), we use the `@_in_transaction_marked` # state to decide if we need to dispatch the offset via client at all # (if post transaction, then we do not have to) # # @note In theory we could only keep reference to the most recent marking and reject # others. We however do not do it for two reasons: # - User may have non standard flow relying on some alternative order and we want # to mimic this # - Complex strategies like VPs can use this in VPs to mark in parallel without # having to redefine the transactional flow completely @_transaction_marked.each do |marking| marking.pop ? mark_as_consumed(*marking) : mark_as_consumed!(*marking) end true end ensure self.producer = default_producer if transaction_started @_transaction_marked.clear @_in_transaction = false @_in_transaction_marked = false end end |