Class: Karafka::Pro::Processing::Filters::Base
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::Filters::Base
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/pro/processing/filters/base.rb
Overview
Base for all the filters. All filters (including custom) need to use this API.
Due to the fact, that filters can limit data in such a way, that we need to pause or seek (throttling for example), the api is not just “remove some things from batch” but also provides ways to control the post-filtering operations that may be needed.
Direct Known Subclasses
Delayer, Expirer, InlineInsightsDelayer, Throttler, VirtualLimiter
Instance Attribute Summary collapse
-
#cursor ⇒ Karafka::Messages::Message?
readonly
The message that we want to use as a cursor one to pause or seek or nil if not applicable.
Instance Method Summary collapse
-
#action ⇒ Symbol
Filter post-execution action on consumer.
-
#applied? ⇒ Boolean
Did this filter change messages in any way.
- #apply!(messages) ⇒ Object
-
#initialize ⇒ Base
constructor
A new instance of Base.
-
#mark_as_consumed? ⇒ Boolean
Should we use the cursor value to mark as consumed.
-
#marking_method ⇒ Symbol
:mark_as_consumed
or:mark_as_consumed!
. -
#timeout ⇒ Integer
Default timeout for pausing (if applicable).
Constructor Details
#initialize ⇒ Base
Returns a new instance of Base.
23 24 25 26 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 23 def initialize @applied = false @cursor = nil end |
Instance Attribute Details
#cursor ⇒ Karafka::Messages::Message? (readonly)
Returns the message that we want to use as a cursor one to pause or seek or nil if not applicable.
19 20 21 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 19 def cursor @cursor end |
Instance Method Details
#action ⇒ Symbol
Returns filter post-execution action on consumer. Either :skip
, :pause
or :seek
.
36 37 38 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 36 def action :skip end |
#applied? ⇒ Boolean
Returns did this filter change messages in any way.
41 42 43 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 41 def applied? @applied end |
#apply!(messages) ⇒ Object
30 31 32 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 30 def apply!() raise NotImplementedError, 'Implement in a subclass' end |
#mark_as_consumed? ⇒ Boolean
Returns should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any).
52 53 54 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 52 def mark_as_consumed? false end |
#marking_method ⇒ Symbol
Returns :mark_as_consumed
or :mark_as_consumed!
. Applicable only if marking is requested.
58 59 60 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 58 def marking_method :mark_as_consumed end |
#timeout ⇒ Integer
Returns default timeout for pausing (if applicable).
46 47 48 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 46 def timeout 0 end |