Module: Karafka::Pro::Processing::Strategies::Default

Overview

No features enabled. No manual offset management No long running jobs No virtual partitions Nothing. Just standard, automatic flow

Constant Summary collapse

FEATURES =

Apply strategy for a non-feature based flow

%i[].freeze

Instance Method Summary collapse

Methods included from Karafka::Processing::Strategies::Default

#commit_offsets, #commit_offsets!, #handle_eofed, #handle_idle, #handle_initialized, #handle_shutdown, #handle_wrap

Methods included from Karafka::Processing::Strategies::Base

#handle_idle, #handle_shutdown

Instance Method Details

#handle_after_consumeObject

Standard flow without any features



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
# File 'lib/karafka/pro/processing/strategies/default.rb', line 337

def handle_after_consume
  coordinator.on_finished do |last_group_message|
    return if revoked?

    if coordinator.success?
      coordinator.pause_tracker.reset

      # Do not mark last message if pause happened. This prevents a scenario where pause
      # is overridden upon rebalance by marking
      return if coordinator.manual_pause?

      mark_as_consumed(last_group_message)
    else
      retry_after_pause
    end
  end
end

#handle_before_consumeObject

Increment number of attempts per one “full” job. For all VP on a single topic partition this also should run once.



304
305
306
307
308
# File 'lib/karafka/pro/processing/strategies/default.rb', line 304

def handle_before_consume
  coordinator.on_started do
    coordinator.pause_tracker.increment
  end
end

#handle_before_schedule_consumeObject

No actions needed for the standard flow here



296
297
298
299
300
# File 'lib/karafka/pro/processing/strategies/default.rb', line 296

def handle_before_schedule_consume
  monitor.instrument('consumer.before_schedule_consume', caller: self)

  nil
end

#handle_before_schedule_tickObject

No action needed for the tick standard flow



372
373
374
375
376
# File 'lib/karafka/pro/processing/strategies/default.rb', line 372

def handle_before_schedule_tick
  monitor.instrument('consumer.before_schedule_tick', caller: self)

  nil
end

#handle_consumeObject

Run the user consumption code



311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/karafka/pro/processing/strategies/default.rb', line 311

def handle_consume
  # We should not run the work at all on a partition that was revoked
  # This can happen primarily when an LRJ job gets to the internal worker queue and
  # this partition is revoked prior processing.
  unless revoked?
    monitor.instrument('consumer.consume', caller: self)
    monitor.instrument('consumer.consumed', caller: self) do
      consume
    end
  end

  # Mark job as successful
  coordinator.success!(self)
rescue StandardError => e
  # If failed, mark as failed
  coordinator.failure!(self, e)

  # Re-raise so reported in the consumer
  raise e
ensure
  # We need to decrease number of jobs that this coordinator coordinates as it has
  # finished
  coordinator.decrement(:consume)
end

#handle_revokedObject

Standard flow for revocation



356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/karafka/pro/processing/strategies/default.rb', line 356

def handle_revoked
  coordinator.on_revoked do
    resume

    coordinator.revoke
  end

  monitor.instrument('consumer.revoke', caller: self)
  monitor.instrument('consumer.revoked', caller: self) do
    revoked
  end
ensure
  coordinator.decrement(:revoked)
end

#handle_tickObject

Runs the consumer #tick method with reporting



379
380
381
382
383
384
385
386
# File 'lib/karafka/pro/processing/strategies/default.rb', line 379

def handle_tick
  monitor.instrument('consumer.tick', caller: self)
  monitor.instrument('consumer.ticked', caller: self) do
    tick
  end
ensure
  coordinator.decrement(:periodic)
end

#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean

Note:

We keep track of this offset in case we would mark as consumed and got error when processing another message. In case like this we do not pause on the message we’ve already processed but rather at the next one. This applies to both sync and async versions of this method.

Marks message as consumed in an async way.

Parameters:

  • message (Messages::Message)

    last successfully processed message.

  • offset_metadata (String, nil) (defaults to: @_current_offset_metadata)

    offset metadata string or nil if nothing

Returns:

  • (Boolean)

    true if we were able to mark the offset, false otherwise. False indicates that we were not able and that we have lost the partition.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/karafka/pro/processing/strategies/default.rb', line 48

def mark_as_consumed(message,  = @_current_offset_metadata)
  # If we are inside a transaction than we can just mark as consumed within it
  if @_in_transaction
    mark_in_transaction(message, , true)
  elsif @_in_transaction_marked
    mark_in_memory(message)
  else
    # seek offset can be nil only in case `#seek` was invoked with offset reset request
    # In case like this we ignore marking
    return true if seek_offset.nil?
    # Ignore if it is the same offset as the one that is marked currently
    # We ignore second marking because it changes nothing and in case of people using
    # metadata storage but with automatic offset marking, this would cause metadata to be
    # erased by automatic marking
    return true if (seek_offset - 1) == message.offset
    return false if revoked?

    # If we are not inside a transaction but this is a transactional topic, we mark with
    # artificially created transaction
    stored = if producer.transactional?
               mark_with_transaction(message, , true)
             elsif @_transactional_marking
               raise Errors::NonTransactionalMarkingAttemptError
             else
               client.mark_as_consumed(message, )
             end

    return revoked? unless stored

    self.seek_offset = message.offset + 1
  end

  true
ensure
  @_current_offset_metadata = nil
end

#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean

Marks message as consumed in a sync way.

Parameters:

  • message (Messages::Message)

    last successfully processed message.

  • offset_metadata (String, nil) (defaults to: @_current_offset_metadata)

    offset metadata string or nil if nothing

Returns:

  • (Boolean)

    true if we were able to mark the offset, false otherwise. False indicates that we were not able and that we have lost the partition.



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/karafka/pro/processing/strategies/default.rb', line 91

def mark_as_consumed!(message,  = @_current_offset_metadata)
  if @_in_transaction
    mark_in_transaction(message, , false)
  elsif @_in_transaction_marked
    mark_in_memory(message)
  else
    # seek offset can be nil only in case `#seek` was invoked with offset reset request
    # In case like this we ignore marking
    return true if seek_offset.nil?
    # Ignore if it is the same offset as the one that is marked currently
    # We ignore second marking because it changes nothing and in case of people using
    # metadata storage but with automatic offset marking, this would cause metadata to be
    # erased by automatic marking
    return true if (seek_offset - 1) == message.offset
    return false if revoked?

    # If we are not inside a transaction but this is a transactional topic, we mark with
    # artificially created transaction
    stored = if producer.transactional?
               mark_with_transaction(message, , false)
             elsif @_transactional_marking
               raise Errors::NonTransactionalMarkingAttemptError
             else
               client.mark_as_consumed!(message, )
             end

    return revoked? unless stored

    self.seek_offset = message.offset + 1
  end

  true
ensure
  @_current_offset_metadata = nil
end

#mark_in_memory(message) ⇒ Boolean

Marks the current state only in memory as the offset marking has already happened using the producer transaction

Parameters:

Returns:

  • (Boolean)

    true if all good, false if we lost assignment and no point in marking



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/karafka/pro/processing/strategies/default.rb', line 276

def mark_in_memory(message)
  # seek offset can be nil only in case `#seek` was invoked with offset reset request
  # In case like this we ignore marking
  return true if seek_offset.nil?
  # Ignore if it is the same offset as the one that is marked currently
  # We ignore second marking because it changes nothing and in case of people using
  # metadata storage but with automatic offset marking, this would cause metadata to be
  # erased by automatic marking
  return true if (seek_offset - 1) == message.offset
  return false if revoked?

  # If we have already marked this successfully in a transaction that was running
  # we should not mark it again with the client offset delegation but instead we should
  # just align the in-memory state
  self.seek_offset = message.offset + 1

  true
end

#mark_in_transaction(message, offset_metadata, async) ⇒ Object

Stores the next offset for processing inside of the transaction and stores it in a local accumulator for post-transaction status update

Parameters:

  • message (Messages::Message)

    message we want to commit inside of a transaction

  • offset_metadata (String, nil)

    offset metadata or nil if none

  • async (Boolean)

    should we mark in async or sync way (applicable only to post transaction state synchronization usage as within transaction it is always sync)

Raises:



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/karafka/pro/processing/strategies/default.rb', line 228

def mark_in_transaction(message, , async)
  raise Errors::TransactionRequiredError unless @_in_transaction
  raise Errors::AssignmentLostError if revoked?

  producer.transaction_mark_as_consumed(
    client,
    message,
    
  )

  # This one is long lived and used to make sure, that users do not mix transactional
  # marking with non-transactional. When this happens we should raise error
  @_transactional_marking = true
  @_in_transaction_marked = true
  @_transaction_marked ||= []
  @_transaction_marked << [message, , async]
end

#mark_with_transaction(message, offset_metadata, async) ⇒ Boolean

Returns false if marking failed otherwise true.

Parameters:

  • message (Messages::Message)

    message we want to commit inside of a transaction

  • offset_metadata (String, nil)

    offset metadata or nil if none

  • async (Boolean)

    should we mark in async or sync way (applicable only to post transaction state synchronization usage as within transaction it is always sync)

Returns:

  • (Boolean)

    false if marking failed otherwise true



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/karafka/pro/processing/strategies/default.rb', line 252

def mark_with_transaction(message, , async)
  # This flag is used by VPs to differentiate between user initiated transactions and
  # post-execution system transactions.
  @_transaction_internal = true

  transaction do
    mark_in_transaction(message, , async)
  end

  true
# We handle both cases here because this is a private API for internal usage and we want
# the post-user code execution marking with transactional producer to result in a
# boolean state of marking for further framework flow. This is a normalization to make it
# behave the same way as it would behave with a non-transactional one
rescue ::Rdkafka::RdkafkaError, Errors::AssignmentLostError
  false
ensure
  @_transaction_internal = false
end

#store_offset_metadata(offset_metadata) ⇒ Object

Note:

Please be aware, that offset metadata set this way will be passed to any marking as consumed even if it was not user initiated. For example in the DLQ flow.

Allows to set offset metadata that will be used with the upcoming marking as consumed as long as a different offset metadata was not used. After it was used either via #mark_as_consumed or #mark_as_consumed! it will be set back to nil. It is done that way to provide the end user with ability to influence metadata on the non-user initiated markings in complex flows.

Parameters:

  • offset_metadata (String, nil)

    metadata we want to store with the upcoming marking as consumed



33
34
35
# File 'lib/karafka/pro/processing/strategies/default.rb', line 33

def ()
  @_current_offset_metadata = 
end

#transaction(active_producer = producer) { ... } ⇒ Object

Note:

Please note, that if you provide the producer, it will reassign the producer of the consumer for the transaction time. This means, that in case you would even accidentally refer to Consumer#producer from other threads, it will contain the reassigned producer and not the initially used/assigned producer. It is done that way, so the message producing aliases operate from within transactions and since the producer in transaction is locked, it will prevent other threads from using it.

Starts producer transaction, saves the transaction context for transactional marking and runs user code in this context

Transactions on a consumer level differ from those initiated by the producer as they allow to mark offsets inside of the transaction. If the transaction is initialized only from the consumer, the offset will be stored in a regular fashion.

Parameters:

  • active_producer (WaterDrop::Producer) (defaults to: producer)

    alternative producer instance we may want to use. It is useful when we have connection pool or any other selective engine for managing multiple producers. If not provided, default producer taken from #producer will be used.

Yields:

  • code that we want to run in a transaction



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/karafka/pro/processing/strategies/default.rb', line 147

def transaction(active_producer = producer)
  default_producer = nil
  transaction_started = nil

  monitor.instrument('consumer.consuming.transaction', caller: self) do
    default_producer = producer
    self.producer = active_producer

    transaction_started = false
    transaction_completed = false

    # Prevent from nested transactions. It would not make any sense
    raise Errors::TransactionAlreadyInitializedError if @_in_transaction

    transaction_started = true
    @_transaction_marked = []
    @_in_transaction = true
    @_in_transaction_marked = false

    producer.transaction do
      yield

      # Ensure this transaction is rolled back if we have lost the ownership of this
      # transaction. We do it only for transactions that contain offset management as for
      # producer only, this is not relevant.
      raise Errors::AssignmentLostError if @_in_transaction_marked && revoked?

      # If we do not reach this, we should not move seek offsets because it means that
      # either an error occured or transaction was aborted.
      # In case of error, it will bubble up so no issue but in case of abort, while we
      # do not reach this place, the code will continue
      transaction_completed = true
    end

    @_in_transaction = false

    # This offset is already stored in transaction but we set it here anyhow because we
    # want to make sure our internal in-memory state is aligned with the transaction
    #
    # @note We never need to use the blocking `#mark_as_consumed!` here because the
    #   offset anyhow was already stored during the transaction
    #
    # @note Since the offset could have been already stored in Kafka (could have because
    #   you can have transactions without marking), we use the `@_in_transaction_marked`
    #   state to decide if we need to dispatch the offset via client at all
    #   (if post transaction, then we do not have to)
    #
    # @note In theory we could only keep reference to the most recent marking and reject
    #   others. We however do not do it for two reasons:
    #   - User may have non standard flow relying on some alternative order and we want
    #     to mimic this
    #   - Complex strategies like VPs can use this in VPs to mark in parallel without
    #     having to redefine the transactional flow completely
    #
    # @note This should be applied only if transaction did not error and if it was not
    #   aborted.
    if transaction_completed
      @_transaction_marked.each do |marking|
        marking.pop ? mark_as_consumed(*marking) : mark_as_consumed!(*marking)
      end
    end

    true
  end
ensure
  self.producer = default_producer

  if transaction_started
    @_transaction_marked.clear
    @_in_transaction = false
    @_in_transaction_marked = false
  end
end