Class: Karafka::Pro::Processing::Filters::VirtualLimiter
- Defined in:
- lib/karafka/pro/processing/filters/virtual_limiter.rb
Overview
It should be registered only when VPs are used
Removes messages that are already marked as consumed in the virtual offset manager This should operate only when using virtual partitions.
This cleaner prevents us from duplicated processing of messages that were virtually marked as consumed even if we could not mark them as consumed in Kafka. This allows us to limit reprocessing when errors occur drastically when operating with virtual partitions
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#apply!(messages) ⇒ Object
Remove messages that we already marked as virtually consumed.
-
#initialize(manager, collapser) ⇒ VirtualLimiter
constructor
A new instance of VirtualLimiter.
-
#timeout ⇒ nil
This filter does not deal with pausing, so timeout is always nil.
Methods inherited from Base
#action, #applied?, #mark_as_consumed?, #marking_cursor, #marking_method
Constructor Details
#initialize(manager, collapser) ⇒ VirtualLimiter
Returns a new instance of VirtualLimiter.
22 23 24 25 26 27 |
# File 'lib/karafka/pro/processing/filters/virtual_limiter.rb', line 22 def initialize(manager, collapser) @manager = manager @collapser = collapser super() end |
Instance Method Details
#apply!(messages) ⇒ Object
Remove messages that we already marked as virtually consumed. Does nothing if not in the collapsed mode.
33 34 35 36 37 38 39 |
# File 'lib/karafka/pro/processing/filters/virtual_limiter.rb', line 33 def apply!() return unless @collapser.collapsed? marked = @manager.marked .delete_if { || marked.include?(.offset) } end |
#timeout ⇒ nil
Returns This filter does not deal with pausing, so timeout is always nil.
42 43 44 |
# File 'lib/karafka/pro/processing/filters/virtual_limiter.rb', line 42 def timeout nil end |