Module: Karafka::Pro::Processing::Strategies::Vp::Default

Includes:
Default
Included in:
Aj::DlqMomVp, Aj::FtrLrjMomVp, Aj::LrjMomVp, Aj::MomVp, Dlq::FtrLrjMomVp, Dlq::FtrLrjVp, Dlq::FtrVp, Dlq::LrjVp, Dlq::Vp, Ftr::Vp, Lrj::FtrVp, Lrj::Vp, Mom::Vp
Defined in:
lib/karafka/pro/processing/strategies/vp/default.rb

Overview

Just Virtual Partitions enabled

Constant Summary collapse

FEATURES =

Features for this strategy

%i[
  virtual_partitions
].freeze

Instance Method Summary collapse

Methods included from Default

#handle_after_consume, #handle_before_consume, #handle_before_schedule_tick, #handle_consume, #handle_revoked, #handle_tick, #mark_in_memory, #mark_with_transaction, #store_offset_metadata, #transaction

Methods included from Karafka::Processing::Strategies::Default

#commit_offsets, #commit_offsets!, #handle_after_consume, #handle_before_consume, #handle_consume, #handle_eofed, #handle_idle, #handle_initialized, #handle_revoked, #handle_shutdown, #handle_wrap

Methods included from Karafka::Processing::Strategies::Base

#handle_after_consume, #handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown

Instance Method Details

#collapse_until!(offset) ⇒ Object

Note:

Keep in mind, that if a batch contains this but also messages earlier messages that should be collapsed, all will continue to operate in a collapsed mode until first full batch with only messages that should not be collapsed.

Parameters:

  • offset (Integer)

    first offset from which we should not operate in a collapsed mode.



122
123
124
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 122

def collapse_until!(offset)
  coordinator.collapse_until!(offset)
end

#collapsed?Boolean

Returns is the virtual processing collapsed in the context of given consumer.

Returns:

  • (Boolean)

    is the virtual processing collapsed in the context of given consumer.



113
114
115
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 113

def collapsed?
  coordinator.collapsed?
end

#failing?Boolean

Note:

We’ve named it #failing? instead of #failure? because it aims to be used from within virtual partitions where we want to have notion of collective failing not just “local” to our processing. We “are” failing with other virtual partitions raising an error, but locally we are still processing.

Returns true if any of virtual partition we’re operating in the entangled mode has already failed and we know we are failing collectively. Useful for early stop to minimize number of things processed twice.

Returns:

  • (Boolean)

    true if any of virtual partition we’re operating in the entangled mode has already failed and we know we are failing collectively. Useful for early stop to minimize number of things processed twice.



134
135
136
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 134

def failing?
  coordinator.failure?
end

#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Object

Note:

This virtual offset management uses a regular default marking API underneath. We do not alter the “real” marking API, as VPs are just one of many cases we want to support and we do not want to impact them with collective offsets management

Parameters:

  • message (Karafka::Messages::Message)

    marks message as consumed

  • offset_metadata (String, nil) (defaults to: @_current_offset_metadata)


36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 36

def mark_as_consumed(message,  = @_current_offset_metadata)
  if @_in_transaction && !collapsed?
    mark_in_transaction(message, , true)
  elsif collapsed?
    super
  else
    manager = coordinator.virtual_offset_manager

    coordinator.synchronize do
      manager.mark(message, )
      # If this is last marking on a finished flow, we can use the original
      # last message and in order to do so, we need to mark all previous messages as
      # consumed as otherwise the computed offset could be different
      # We mark until our offset just in case of a DLQ flow or similar, where we do not
      # want to mark all but until the expected location
      manager.mark_until(message, ) if coordinator.finished?

      return revoked? unless manager.markable?

      manager.markable? ? super(*manager.markable) : revoked?
    end
  end
ensure
  @_current_offset_metadata = nil
end

#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Object

Parameters:

  • message (Karafka::Messages::Message)

    blocking marks message as consumed

  • offset_metadata (String, nil) (defaults to: @_current_offset_metadata)


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 64

def mark_as_consumed!(message,  = @_current_offset_metadata)
  if @_in_transaction && !collapsed?
    mark_in_transaction(message, , false)
  elsif collapsed?
    super
  else
    manager = coordinator.virtual_offset_manager

    coordinator.synchronize do
      manager.mark(message, )
      manager.mark_until(message, ) if coordinator.finished?
      manager.markable? ? super(*manager.markable) : revoked?
    end
  end
ensure
  @_current_offset_metadata = nil
end

#mark_in_transaction(message, offset_metadata, async) ⇒ Object

Stores the next offset for processing inside of the transaction when collapsed and accumulates marking as consumed in the local buffer.

Due to nature of VPs we cannot provide full EOS support but we can simulate it, making sure that no offset are stored unless transaction is finished. We do it by accumulating the post-transaction marking requests and after it is successfully done we mark each as consumed. This effectively on errors “rollbacks” the state and prevents offset storage.

Since the EOS here is “weak”, we do not have to worry about the race-conditions and we do not have to have any mutexes.

Parameters:

  • message (Messages::Message)

    message we want to commit inside of a transaction

  • offset_metadata (String, nil)

    offset metadata or nil if none

  • async (Boolean)

    should we mark in async or sync way (applicable only to post transaction state synchronization usage as within transaction it is always sync)

Raises:



98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 98

def mark_in_transaction(message, , async)
  raise Errors::TransactionRequiredError unless @_in_transaction
  # Prevent from attempts of offset storage when we no longer own the assignment
  raise Errors::AssignmentLostError if revoked?

  return super if collapsed?
  # If this is user post-execution transaction (one initiated by the system) we should
  # delegate to the original implementation that will store the offset via the producer
  return super if @_transaction_internal

  @_transaction_marked << [message, , async]
end

#synchronize(&block) ⇒ Object

Allows for cross-virtual-partition consumers locks

This is not needed in the non-VP flows except LRJ because there is always only one consumer per partition at the same time, so no coordination is needed directly for the end users. With LRJ it is needed and provided in the LRJ::Default strategy, because lifecycle events on revocation can run in parallel to the LRJ job as it is non-blocking.

Parameters:

  • block (Proc)

    block we want to run in a mutex to prevent race-conditions



147
148
149
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 147

def synchronize(&block)
  coordinator.shared_mutex.synchronize(&block)
end