Class: Karafka::Pro::Processing::Filters::Expirer
- Defined in:
- lib/karafka/pro/processing/filters/expirer.rb
Overview
Expirer for removing too old messages. It never moves offsets in any way and does not impact the processing flow. It always runs :skip
action.
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#apply!(messages) ⇒ Object
Removes too old messages.
-
#initialize(ttl) ⇒ Expirer
constructor
A new instance of Expirer.
Methods inherited from Base
#action, #applied?, #mark_as_consumed?, #marking_method, #timeout
Constructor Details
#initialize(ttl) ⇒ Expirer
Returns a new instance of Expirer.
23 24 25 26 27 |
# File 'lib/karafka/pro/processing/filters/expirer.rb', line 23 def initialize(ttl) super() @ttl = ttl end |
Instance Method Details
#apply!(messages) ⇒ Object
Removes too old messages
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/karafka/pro/processing/filters/expirer.rb', line 32 def apply!() @applied = false # Time on message is in seconds with ms precision, so we need to convert the ttl that # is in ms to this format border = ::Time.now.utc - @ttl / 1_000.to_f .delete_if do || too_old = . < border @applied = true if too_old too_old end end |