Class: Karafka::Pro::Processing::Filters::Base

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/pro/processing/filters/base.rb

Overview

Base for all the filters. All filters (including custom) need to use this API.

Due to the fact, that filters can limit data in such a way, that we need to pause or seek (throttling for example), the api is not just “remove some things from batch” but also provides ways to control the post-filtering operations that may be needed.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBase

Returns a new instance of Base.



23
24
25
26
# File 'lib/karafka/pro/processing/filters/base.rb', line 23

def initialize
  @applied = false
  @cursor = nil
end

Instance Attribute Details

#cursorKarafka::Messages::Message? (readonly)

Returns the message that we want to use as a cursor one to pause or seek or nil if not applicable.

Returns:

  • (Karafka::Messages::Message, nil)

    the message that we want to use as a cursor one to pause or seek or nil if not applicable.



19
20
21
# File 'lib/karafka/pro/processing/filters/base.rb', line 19

def cursor
  @cursor
end

Instance Method Details

#actionSymbol

Returns filter post-execution action on consumer. Either :skip, :pause or :seek.

Returns:

  • (Symbol)

    filter post-execution action on consumer. Either :skip, :pause or :seek.



36
37
38
# File 'lib/karafka/pro/processing/filters/base.rb', line 36

def action
  :skip
end

#applied?Boolean

Returns did this filter change messages in any way.

Returns:

  • (Boolean)

    did this filter change messages in any way



41
42
43
# File 'lib/karafka/pro/processing/filters/base.rb', line 41

def applied?
  @applied
end

#apply!(messages) ⇒ Object

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    array with messages. Please keep in mind, this may already be partial due to execution of previous filters.

Raises:

  • (NotImplementedError)


30
31
32
# File 'lib/karafka/pro/processing/filters/base.rb', line 30

def apply!(messages)
  raise NotImplementedError, 'Implement in a subclass'
end

#mark_as_consumed?Boolean

Returns should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any).

Returns:

  • (Boolean)

    should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any)



52
53
54
# File 'lib/karafka/pro/processing/filters/base.rb', line 52

def mark_as_consumed?
  false
end

#marking_methodSymbol

Returns :mark_as_consumed or :mark_as_consumed!. Applicable only if marking is requested.

Returns:

  • (Symbol)

    :mark_as_consumed or :mark_as_consumed!. Applicable only if marking is requested



58
59
60
# File 'lib/karafka/pro/processing/filters/base.rb', line 58

def marking_method
  :mark_as_consumed
end

#timeoutInteger

Returns default timeout for pausing (if applicable).

Returns:

  • (Integer)

    default timeout for pausing (if applicable)



46
47
48
# File 'lib/karafka/pro/processing/filters/base.rb', line 46

def timeout
  0
end