Class: Karafka::Pro::Processing::Filters::VirtualLimiter

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/pro/processing/filters/virtual_limiter.rb

Overview

Note:

It should be registered only when VPs are used

Removes messages that are already marked as consumed in the virtual offset manager This should operate only when using virtual partitions.

This cleaner prevents us from duplicated processing of messages that were virtually marked as consumed even if we could not mark them as consumed in Kafka. This allows us to limit reprocessing when errors occur drastically when operating with virtual partitions

Instance Attribute Summary

Attributes inherited from Base

#cursor

Instance Method Summary collapse

Methods inherited from Base

#action, #applied?, #mark_as_consumed?, #marking_method, #timeout

Constructor Details

#initialize(manager, collapser) ⇒ VirtualLimiter

Returns a new instance of VirtualLimiter.

Parameters:



22
23
24
25
26
27
# File 'lib/karafka/pro/processing/filters/virtual_limiter.rb', line 22

def initialize(manager, collapser)
  @manager = manager
  @collapser = collapser

  super()
end

Instance Method Details

#apply!(messages) ⇒ Object

Remove messages that we already marked as virtually consumed. Does nothing if not in the collapsed mode.

Parameters:



33
34
35
36
37
38
39
# File 'lib/karafka/pro/processing/filters/virtual_limiter.rb', line 33

def apply!(messages)
  return unless @collapser.collapsed?

  marked = @manager.marked

  messages.delete_if { |message| marked.include?(message.offset) }
end