Class: Karafka::Pro::Processing::Filters::Base

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/pro/processing/filters/base.rb

Overview

Base for all the filters. All filters (including custom) need to use this API.

Due to the fact, that filters can limit data in such a way, that we need to pause or seek (throttling for example), the api is not just “remove some things from batch” but also provides ways to control the post-filtering operations that may be needed.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBase

Returns a new instance of Base.



31
32
33
34
# File 'lib/karafka/pro/processing/filters/base.rb', line 31

def initialize
  @applied = false
  @cursor = nil
end

Instance Attribute Details

#cursorKarafka::Messages::Message? (readonly)

Returns the message that we want to use as a cursor one to pause or seek or nil if not applicable.

Returns:

  • (Karafka::Messages::Message, nil)

    the message that we want to use as a cursor one to pause or seek or nil if not applicable.



27
28
29
# File 'lib/karafka/pro/processing/filters/base.rb', line 27

def cursor
  @cursor
end

Instance Method Details

#actionSymbol

Returns filter post-execution action on consumer. Either :skip, :pause or :seek.

Returns:

  • (Symbol)

    filter post-execution action on consumer. Either :skip, :pause or :seek.



44
45
46
# File 'lib/karafka/pro/processing/filters/base.rb', line 44

def action
  :skip
end

#applied?Boolean

Returns did this filter change messages in any way.

Returns:

  • (Boolean)

    did this filter change messages in any way



49
50
51
# File 'lib/karafka/pro/processing/filters/base.rb', line 49

def applied?
  @applied
end

#apply!(messages) ⇒ Object

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    array with messages. Please keep in mind, this may already be partial due to execution of previous filters.

Raises:

  • (NotImplementedError)


38
39
40
# File 'lib/karafka/pro/processing/filters/base.rb', line 38

def apply!(messages)
  raise NotImplementedError, 'Implement in a subclass'
end

#mark_as_consumed?Boolean

Returns should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any).

Returns:

  • (Boolean)

    should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any)



60
61
62
# File 'lib/karafka/pro/processing/filters/base.rb', line 60

def mark_as_consumed?
  false
end

#marking_methodSymbol

Returns :mark_as_consumed or :mark_as_consumed!. Applicable only if marking is requested.

Returns:

  • (Symbol)

    :mark_as_consumed or :mark_as_consumed!. Applicable only if marking is requested



66
67
68
# File 'lib/karafka/pro/processing/filters/base.rb', line 66

def marking_method
  :mark_as_consumed
end

#timeoutInteger

Returns default timeout for pausing (if applicable).

Returns:

  • (Integer)

    default timeout for pausing (if applicable)



54
55
56
# File 'lib/karafka/pro/processing/filters/base.rb', line 54

def timeout
  0
end