Class: Karafka::Pro::Processing::Filters::VirtualLimiter
- Defined in:
- lib/karafka/pro/processing/filters/virtual_limiter.rb
Overview
Note:
It should be registered only when VPs are used
Removes messages that are already marked as consumed in the virtual offset manager This should operate only when using virtual partitions.
This cleaner prevents us from duplicated processing of messages that were virtually marked as consumed even if we could not mark them as consumed in Kafka. This allows us to limit reprocessing when errors occur drastically when operating with virtual partitions
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#apply!(messages) ⇒ Object
Remove messages that we already marked as virtually consumed.
-
#initialize(manager, collapser) ⇒ VirtualLimiter
constructor
A new instance of VirtualLimiter.
Methods inherited from Base
#action, #applied?, #mark_as_consumed?, #marking_method, #timeout
Constructor Details
#initialize(manager, collapser) ⇒ VirtualLimiter
Returns a new instance of VirtualLimiter.
22 23 24 25 26 27 |
# File 'lib/karafka/pro/processing/filters/virtual_limiter.rb', line 22 def initialize(manager, collapser) @manager = manager @collapser = collapser super() end |
Instance Method Details
#apply!(messages) ⇒ Object
Remove messages that we already marked as virtually consumed. Does nothing if not in the collapsed mode.
33 34 35 36 37 38 39 |
# File 'lib/karafka/pro/processing/filters/virtual_limiter.rb', line 33 def apply!() return unless @collapser.collapsed? marked = @manager.marked .delete_if { || marked.include?(.offset) } end |