Module: Karafka::Processing::Strategies::Dlq
Overview
When using dead letter queue, processing won’t stop after defined number of retries upon encountering non-critical errors but the messages that error will be moved to a separate topic with their payload and metadata, so they can be handled differently.
Constant Summary collapse
- FEATURES =
Apply strategy when only dead letter queue is turned on
%i[ dead_letter_queue ].freeze
Instance Method Summary collapse
-
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings.
-
#find_skippable_message ⇒ Array<Karafka::Messages::Message, Boolean>
Finds the message may want to skip (all, starting from first).
-
#handle_after_consume ⇒ Object
When manual offset management is on, we do not mark anything as consumed automatically and we rely on the user to figure things out.
-
#mark_after_dispatch? ⇒ Boolean
Should we mark given message as consumed after dispatch.
-
#mark_as_consumed(message) ⇒ Object
Override of the standard
#mark_as_consumed
in order to handle the pause tracker reset in case DLQ is marked as fully independent. -
#mark_as_consumed!(message) ⇒ Object
Override of the standard
#mark_as_consumed!
. -
#mark_dispatched_to_dlq(skippable_message) ⇒ Object
Marks message that went to DLQ (if applicable) based on the requested method.
Methods included from Default
#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_eofed, #handle_idle, #handle_initialized, #handle_revoked, #handle_shutdown, #handle_wrap
Methods included from Base
#handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Instance Method Details
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings
112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 112 def dispatch_to_dlq() producer.public_send( topic.dead_letter_queue.dispatch_method, topic: topic.dead_letter_queue.topic, payload: .raw_payload ) # Notify about dispatch on the events bus monitor.instrument( 'dead_letter_queue.dispatched', caller: self, message: ) end |
#find_skippable_message ⇒ Array<Karafka::Messages::Message, Boolean>
Finds the message may want to skip (all, starting from first)
96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 96 def = .find do |msg| coordinator.marked? && msg.offset == seek_offset end # If we don't have the message matching the last comitted offset, it means that # user operates with manual offsets and we're beyond the batch in which things # broke for the first time. Then we skip the first (as no markings) and we # move on one by one. ? [, true] : [.first, false] end |
#handle_after_consume ⇒ Object
When manual offset management is on, we do not mark anything as consumed automatically and we rely on the user to figure things out
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 55 def handle_after_consume return if revoked? if coordinator.success? coordinator.pause_tracker.reset return if coordinator.manual_pause? mark_as_consumed(.last) elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries retry_after_pause # If we've reached number of retries that we could, we need to skip the first message # that was not marked as consumed, pause and continue, while also moving this message # to the dead topic else # We reset the pause to indicate we will now consider it as "ok". coordinator.pause_tracker.reset , = # Send skippable message to the dql topic dispatch_to_dlq() # We mark the broken message as consumed and move on if mark_after_dispatch? mark_dispatched_to_dlq() return if revoked? else self.seek_offset = .offset + 1 end # We pause to backoff once just in case. pause(seek_offset, nil, false) end end |
#mark_after_dispatch? ⇒ Boolean
Returns should we mark given message as consumed after dispatch. For default non MOM strategies if user did not explicitly tell us not to, we mark it. Default is nil
, which means true
in this case. If user provided alternative value, we go with it.
131 132 133 134 135 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 131 def mark_after_dispatch? return true if topic.dead_letter_queue.mark_after_dispatch.nil? topic.dead_letter_queue.mark_after_dispatch end |
#mark_as_consumed(message) ⇒ Object
Override of the standard #mark_as_consumed
in order to handle the pause tracker reset in case DLQ is marked as fully independent. When DLQ is marked independent, any offset marking causes the pause count tracker to reset. This is useful when the error is not due to the collective batch operations state but due to intermediate “crawling” errors that move with it
25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 25 def mark_as_consumed() # If we are not retrying pause count is already 0, no need to try to reset the state return super unless # If we do not use independent marking on DLQ, we just mark as consumed return super unless topic.dead_letter_queue.independent? # If we were not able to mark no need to reset return false unless super coordinator.pause_tracker.reset true end |
#mark_as_consumed!(message) ⇒ Object
Override of the standard #mark_as_consumed!
. Resets the pause tracker count in case DLQ was configured with the independent
flag.
43 44 45 46 47 48 49 50 51 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 43 def mark_as_consumed!() return super unless return super unless topic.dead_letter_queue.independent? return false unless super coordinator.pause_tracker.reset true end |
#mark_dispatched_to_dlq(skippable_message) ⇒ Object
Marks message that went to DLQ (if applicable) based on the requested method
139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 139 def mark_dispatched_to_dlq() case topic.dead_letter_queue.marking_method when :mark_as_consumed mark_as_consumed() when :mark_as_consumed! mark_as_consumed!() else # This should never happen. Bug if encountered. Please report raise Karafka::Errors::UnsupportedCaseError end end |