Module: Karafka::Routing::Features::DeadLetterQueue::Topic

Defined in:
lib/karafka/routing/features/dead_letter_queue/topic.rb

Overview

DLQ topic extensions

Instance Method Summary collapse

Instance Method Details

#dead_letter_queue(max_retries: DEFAULT_MAX_RETRIES, topic: nil, independent: false, transactional: true, dispatch_method: :produce_async, marking_method: :mark_as_consumed, mark_after_dispatch: nil) ⇒ Config

Returns defined config.

Parameters:

  • max_retries (Integer) (defaults to: DEFAULT_MAX_RETRIES)

    after how many retries should we move data to dlq

  • topic (String, false) (defaults to: nil)

    where the messages should be moved if failing or false if we do not want to move it anywhere and just skip

  • independent (Boolean) (defaults to: false)

    needs to be true in order for each marking as consumed in a retry flow to reset the errors counter

  • transactional (Boolean) (defaults to: true)

    if applicable, should transaction be used to move given message to the dead-letter topic and mark it as consumed.

  • dispatch_method (Symbol) (defaults to: :produce_async)

    :produce_async or :produce_sync. Describes whether dispatch on dlq should be sync or async (async by default)

  • marking_method (Symbol) (defaults to: :mark_as_consumed)

    :mark_as_consumed or :mark_as_consumed!. Describes whether marking on DLQ should be async or sync (async by default)

  • mark_after_dispatch (Boolean, nil) (defaults to: nil)

    Should we mark after dispatch. nil means that the default strategy approach to marking will be used. true or false overwrites the default

Returns:



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/karafka/routing/features/dead_letter_queue/topic.rb', line 29

def dead_letter_queue(
  max_retries: DEFAULT_MAX_RETRIES,
  topic: nil,
  independent: false,
  transactional: true,
  dispatch_method: :produce_async,
  marking_method: :mark_as_consumed,
  mark_after_dispatch: nil
)
  @dead_letter_queue ||= Config.new(
    active: !topic.nil?,
    max_retries: max_retries,
    topic: topic,
    independent: independent,
    transactional: transactional,
    dispatch_method: dispatch_method,
    marking_method: marking_method,
    mark_after_dispatch: mark_after_dispatch
  )
end

#dead_letter_queue?Boolean

Returns is the dlq active or not.

Returns:

  • (Boolean)

    is the dlq active or not



51
52
53
# File 'lib/karafka/routing/features/dead_letter_queue/topic.rb', line 51

def dead_letter_queue?
  dead_letter_queue.active?
end

#to_hHash

Returns topic with all its native configuration options plus dlq settings.

Returns:

  • (Hash)

    topic with all its native configuration options plus dlq settings



56
57
58
59
60
# File 'lib/karafka/routing/features/dead_letter_queue/topic.rb', line 56

def to_h
  super.merge(
    dead_letter_queue: dead_letter_queue.to_h
  ).freeze
end