Class: Karafka::Pro::Processing::Filters::Expirer

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/pro/processing/filters/expirer.rb

Overview

Expirer for removing too old messages. It never moves offsets in any way and does not impact the processing flow. It always runs :skip action.

Instance Attribute Summary

Attributes inherited from Base

#cursor

Instance Method Summary collapse

Methods inherited from Base

#action, #applied?, #mark_as_consumed?, #marking_method, #timeout

Constructor Details

#initialize(ttl) ⇒ Expirer

Returns a new instance of Expirer.

Parameters:

  • ttl (Integer)

    maximum age of a message (in ms)



23
24
25
26
27
# File 'lib/karafka/pro/processing/filters/expirer.rb', line 23

def initialize(ttl)
  super()

  @ttl = ttl
end

Instance Method Details

#apply!(messages) ⇒ Object

Removes too old messages

Parameters:



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/karafka/pro/processing/filters/expirer.rb', line 32

def apply!(messages)
  @applied = false

  # Time on message is in seconds with ms precision, so we need to convert the ttl that
  # is in ms to this format
  border = ::Time.now.utc - @ttl / 1_000.to_f

  messages.delete_if do |message|
    too_old = message.timestamp < border

    @applied = true if too_old

    too_old
  end
end