Module: Karafka::Processing::Strategies::AjDlqMom
- Includes:
- DlqMom
- Defined in:
- lib/karafka/processing/strategies/aj_dlq_mom.rb
Overview
ActiveJob strategy to cooperate with the DLQ.
While AJ is uses MOM by default because it delegates the offset management to the AJ consumer. With DLQ however there is an extra case for skipping broken jobs with offset marking due to ordered processing.
Constant Summary collapse
- FEATURES =
Apply strategy when only when using AJ with MOM and DLQ
%i[ active_job dead_letter_queue manual_offset_management ].freeze
Instance Method Summary collapse
-
#handle_after_consume ⇒ Object
How should we post-finalize consumption.
Methods included from DlqMom
Methods included from Dlq
#dispatch_to_dlq, #find_skippable_message, #mark_after_dispatch?, #mark_as_consumed, #mark_as_consumed!, #mark_dispatched_to_dlq
Methods included from Default
#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_eofed, #handle_idle, #handle_initialized, #handle_revoked, #handle_shutdown, #handle_wrap, #mark_as_consumed, #mark_as_consumed!
Methods included from Base
#handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Instance Method Details
#handle_after_consume ⇒ Object
How should we post-finalize consumption.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/karafka/processing/strategies/aj_dlq_mom.rb', line 22 def handle_after_consume return if revoked? if coordinator.success? # Do NOT commit offsets, they are comitted after each job in the AJ consumer. coordinator.pause_tracker.reset elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries retry_after_pause else coordinator.pause_tracker.reset , = dispatch_to_dlq() # We can commit the offset here because we know that we skip it "forever" and # since AJ consumer commits the offset after each job, we also know that the # previous job was successful mark_dispatched_to_dlq() pause(seek_offset, nil, false) end end |