Module: Karafka::Pro::Processing::Strategies::Dlq::Default
- Defined in:
- lib/karafka/pro/processing/strategies/dlq/default.rb
Overview
Only dead letter queue enabled
Constant Summary collapse
- FEATURES =
Features for this strategy
%i[ dead_letter_queue ].freeze
Instance Method Summary collapse
-
#apply_dlq_flow ⇒ Object
Runs the DLQ strategy and based on it it performs certain operations.
-
#build_dlq_message(skippable_message) ⇒ Hash
Dispatch DLQ message.
-
#dispatch_if_needed_and_mark_as_consumed ⇒ Object
Dispatches the message to the DLQ (when needed and when applicable based on settings) and marks this message as consumed for non MOM flows.
-
#dispatch_in_a_transaction? ⇒ Boolean
Should we use a transaction to move the data to the DLQ.
-
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings.
-
#dispatch_to_dlq? ⇒ Boolean
Should we dispatch the message to DLQ or not.
-
#find_skippable_message ⇒ Array<Karafka::Messages::Message, Boolean>
Finds the message may want to skip (all, starting from first).
-
#handle_after_consume ⇒ Object
When we encounter non-recoverable message, we skip it and go on with our lives.
-
#mark_after_dispatch? ⇒ Boolean
Should we mark given message as consumed after dispatch.
-
#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Object
Override of the standard
#mark_as_consumed
in order to handle the pause tracker reset in case DLQ is marked as fully independent. -
#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Object
Override of the standard
#mark_as_consumed!
. -
#mark_dispatched_to_dlq(skippable_message) ⇒ Object
Marks message that went to DLQ (if applicable) based on the requested method.
Methods included from Karafka::Pro::Processing::Strategies::Default
#handle_before_consume, #handle_before_schedule_consume, #handle_before_schedule_tick, #handle_consume, #handle_revoked, #handle_tick, #mark_in_memory, #mark_in_transaction, #mark_with_transaction, #store_offset_metadata, #transaction
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_eofed, #handle_idle, #handle_initialized, #handle_revoked, #handle_shutdown, #handle_wrap
Methods included from Karafka::Processing::Strategies::Base
#handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Instance Method Details
#apply_dlq_flow ⇒ Object
Runs the DLQ strategy and based on it it performs certain operations
In case of :skip
and :dispatch
will run the exact flow provided in a block In case of :retry
always #retry_after_pause
is applied
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 207 def apply_dlq_flow flow = topic.dead_letter_queue.strategy.call(errors_tracker, attempt) case flow when :retry retry_after_pause return when :skip @_dispatch_to_dlq = false when :dispatch @_dispatch_to_dlq = true else raise Karafka::UnsupportedCaseError, flow end yield # We reset the pause to indicate we will now consider it as "ok". coordinator.pause_tracker.reset # Always backoff after DLQ dispatch even on skip to prevent overloads on errors pause(seek_offset, nil, false) end |
#build_dlq_message(skippable_message) ⇒ Hash
Returns dispatch DLQ message.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 147 def () original_partition = .partition.to_s = { topic: topic.dead_letter_queue.topic, key: original_partition, payload: .raw_payload, headers: .raw_headers.merge( 'original_topic' => topic.name, 'original_partition' => original_partition, 'original_offset' => .offset.to_s, 'original_consumer_group' => topic.consumer_group.id, 'original_key' => .raw_key.to_s, 'original_attempts' => attempt.to_s ) } # Optional method user can define in consumer to enhance the dlq message hash with # some extra details if needed or to replace payload, etc if respond_to?(:enhance_dlq_message, true) ( , ) end end |
#dispatch_if_needed_and_mark_as_consumed ⇒ Object
Dispatches the message to the DLQ (when needed and when applicable based on settings) and marks this message as consumed for non MOM flows.
If producer is transactional and config allows, uses transaction to do that
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 125 def dispatch_if_needed_and_mark_as_consumed , = dispatch = lambda do dispatch_to_dlq() if dispatch_to_dlq? if mark_after_dispatch? mark_dispatched_to_dlq() else self.seek_offset = .offset + 1 end end if dispatch_in_a_transaction? transaction { dispatch.call } else dispatch.call end end |
#dispatch_in_a_transaction? ⇒ Boolean
Returns should we use a transaction to move the data to the DLQ. We can do it only when producer is transactional and configuration for DLQ transactional dispatches is not set to false.
189 190 191 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 189 def dispatch_in_a_transaction? producer.transactional? && topic.dead_letter_queue.transactional? end |
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 99 def dispatch_to_dlq() # DLQ should never try to dispatch a message that was cleaned. It message was # cleaned, we will not have all the needed data. If you see this error, it means # that your processing flow is not as expected and you have cleaned message that # should not be cleaned as it should go to the DLQ raise(Cleaner::Errors::MessageCleanedError) if .cleaned? producer.public_send( topic.dead_letter_queue.dispatch_method, ( ) ) # Notify about dispatch on the events bus monitor.instrument( 'dead_letter_queue.dispatched', caller: self, message: ) end |
#dispatch_to_dlq? ⇒ Boolean
Returns should we dispatch the message to DLQ or not. When the dispatch topic is set to false, we will skip the dispatch, effectively ignoring the broken message without taking any action.
179 180 181 182 183 184 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 179 def dispatch_to_dlq? return false unless topic.dead_letter_queue.topic return false unless @_dispatch_to_dlq true end |
#find_skippable_message ⇒ Array<Karafka::Messages::Message, Boolean>
Finds the message may want to skip (all, starting from first)
83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 83 def = .find do |msg| coordinator.marked? && msg.offset == seek_offset end # If we don't have the message matching the last comitted offset, it means that # user operates with manual offsets and we're beyond the batch in which things # broke for the first time. Then we skip the first (as no markings) and we # move on one by one. ? [, true] : [.first, false] end |
#handle_after_consume ⇒ Object
When we encounter non-recoverable message, we skip it and go on with our lives
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 61 def handle_after_consume coordinator.on_finished do || return if revoked? if coordinator.success? coordinator.pause_tracker.reset return if coordinator.manual_pause? mark_as_consumed() else apply_dlq_flow do dispatch_if_needed_and_mark_as_consumed end end end end |
#mark_after_dispatch? ⇒ Boolean
Returns should we mark given message as consumed after dispatch. For default non MOM strategies if user did not explicitly tell us not to, we mark it. Default is nil
, which means true
in this case. If user provided alternative value, we go with it.
197 198 199 200 201 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 197 def mark_after_dispatch? return true if topic.dead_letter_queue.mark_after_dispatch.nil? topic.dead_letter_queue.mark_after_dispatch end |
#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Object
Override of the standard #mark_as_consumed
in order to handle the pause tracker reset in case DLQ is marked as fully independent. When DLQ is marked independent, any offset marking causes the pause count tracker to reset. This is useful when the error is not due to the collective batch operations state but due to intermediate “crawling” errors that move with it
30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 30 def mark_as_consumed(, = @_current_offset_metadata) return super unless return super unless topic.dead_letter_queue.independent? return false unless super coordinator.pause_tracker.reset true ensure @_current_offset_metadata = nil end |
#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Object
Override of the standard #mark_as_consumed!
. Resets the pause tracker count in case DLQ was configured with the independent
flag.
48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 48 def mark_as_consumed!(, = @_current_offset_metadata) return super unless return super unless topic.dead_letter_queue.independent? return false unless super coordinator.pause_tracker.reset true ensure @_current_offset_metadata = nil end |
#mark_dispatched_to_dlq(skippable_message) ⇒ Object
Marks message that went to DLQ (if applicable) based on the requested method
234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 234 def mark_dispatched_to_dlq() case topic.dead_letter_queue.marking_method when :mark_as_consumed mark_as_consumed() when :mark_as_consumed! mark_as_consumed!() else # This should never happen. Bug if encountered. Please report raise Karafka::Errors::UnsupportedCaseError end end |