Class: Karafka::Pro::Processing::Filters::Base
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::Filters::Base
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/pro/processing/filters/base.rb
Overview
Base for all the filters. All filters (including custom) need to use this API.
Due to the fact, that filters can limit data in such a way, that we need to pause or seek (throttling for example), the api is not just “remove some things from batch” but also provides ways to control the post-filtering operations that may be needed.
Direct Known Subclasses
Delayer, Expirer, InlineInsightsDelayer, Throttler, VirtualLimiter, ParallelSegments::Filters::Base
Instance Attribute Summary collapse
-
#cursor ⇒ Karafka::Messages::Message?
readonly
The message that we want to use as a cursor one to pause or seek or nil if not applicable.
Instance Method Summary collapse
-
#action ⇒ Symbol
Filter post-execution action on consumer.
-
#applied? ⇒ Boolean
Did this filter change messages in any way.
- #apply!(messages) ⇒ Object
-
#initialize ⇒ Base
constructor
A new instance of Base.
-
#mark_as_consumed? ⇒ Boolean
Should we use the cursor value to mark as consumed.
-
#marking_cursor ⇒ Karafka::Messages::Message?
Cursor message for marking or nil if no marking.
-
#marking_method ⇒ Symbol
:mark_as_consumed
or:mark_as_consumed!
. -
#timeout ⇒ Integer?
Default timeout for pausing (if applicable) or nil if not.
Constructor Details
#initialize ⇒ Base
Returns a new instance of Base.
23 24 25 26 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 23 def initialize @applied = false @cursor = nil end |
Instance Attribute Details
#cursor ⇒ Karafka::Messages::Message? (readonly)
Returns the message that we want to use as a cursor one to pause or seek or nil if not applicable.
19 20 21 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 19 def cursor @cursor end |
Instance Method Details
#action ⇒ Symbol
Returns filter post-execution action on consumer. Either :skip
, :pause
or :seek
.
36 37 38 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 36 def action :skip end |
#applied? ⇒ Boolean
Returns did this filter change messages in any way.
41 42 43 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 41 def applied? @applied end |
#apply!(messages) ⇒ Object
30 31 32 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 30 def apply!() raise NotImplementedError, 'Implement in a subclass' end |
#mark_as_consumed? ⇒ Boolean
Returns should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any).
54 55 56 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 54 def mark_as_consumed? false end |
#marking_cursor ⇒ Karafka::Messages::Message?
Returns cursor message for marking or nil if no marking.
66 67 68 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 66 def marking_cursor cursor end |
#marking_method ⇒ Symbol
Returns :mark_as_consumed
or :mark_as_consumed!
. Applicable only if marking is requested.
60 61 62 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 60 def marking_method :mark_as_consumed end |
#timeout ⇒ Integer?
Please do not return 0
when your filter is not pausing as it may interact with other filters that want to pause.
Returns default timeout for pausing (if applicable) or nil if not.
48 49 50 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 48 def timeout nil end |