Class: Karafka::Pro::Processing::Filters::Delayer
- Defined in:
- lib/karafka/pro/processing/filters/delayer.rb
Overview
A filter that allows us to delay processing by pausing until time is right.
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#action ⇒ Symbol
Action to take on post-filtering.
-
#apply!(messages) ⇒ Object
Removes too young messages.
-
#initialize(delay) ⇒ Delayer
constructor
A new instance of Delayer.
-
#timeout ⇒ Integer
Timeout delay in ms.
Methods inherited from Base
#applied?, #mark_as_consumed?, #marking_method
Constructor Details
#initialize(delay) ⇒ Delayer
Returns a new instance of Delayer.
13 14 15 16 17 |
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 13 def initialize(delay) super() @delay = delay end |
Instance Method Details
#action ⇒ Symbol
Returns action to take on post-filtering.
53 54 55 56 57 |
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 53 def action return :skip unless applied? timeout <= 0 ? :seek : :pause end |
#apply!(messages) ⇒ Object
Removes too young messages
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 22 def apply!() @applied = false @cursor = nil # Time on message is in seconds with ms precision, so we need to convert the ttl that # is in ms to this format border = ::Time.now.utc - @delay / 1_000.0 .delete_if do || too_young = . > border if too_young @applied = true @cursor ||= end @applied end end |
#timeout ⇒ Integer
Returns timeout delay in ms.
44 45 46 47 48 49 50 |
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 44 def timeout return 0 unless @cursor timeout = (@delay / 1_000.0) - (::Time.now.utc - @cursor.) timeout <= 0 ? 0 : timeout * 1_000 end |