Class: Karafka::Pro::Processing::Filters::Base
- Inherits:
- 
      Object
      
        - Object
- Karafka::Pro::Processing::Filters::Base
 
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/pro/processing/filters/base.rb
Overview
Base for all the filters. All filters (including custom) need to use this API.
Due to the fact, that filters can limit data in such a way, that we need to pause or seek (throttling for example), the api is not just “remove some things from batch” but also provides ways to control the post-filtering operations that may be needed.
Direct Known Subclasses
Delayer, Expirer, InlineInsightsDelayer, Throttler, VirtualLimiter, ParallelSegments::Filters::Base
Instance Attribute Summary collapse
- 
  
    
      #cursor  ⇒ Karafka::Messages::Message? 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    The message that we want to use as a cursor one to pause or seek or nil if not applicable. 
Instance Method Summary collapse
- 
  
    
      #action  ⇒ Symbol 
    
    
  
  
  
  
  
  
  
  
  
    Filter post-execution action on consumer. 
- 
  
    
      #applied?  ⇒ Boolean 
    
    
  
  
  
  
  
  
  
  
  
    Did this filter change messages in any way. 
- 
  
    
      #apply!(messages)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
- 
  
    
      #initialize  ⇒ Base 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of Base. 
- 
  
    
      #mark_as_consumed?  ⇒ Boolean 
    
    
  
  
  
  
  
  
  
  
  
    Should we use the cursor value to mark as consumed. 
- 
  
    
      #marking_cursor  ⇒ Karafka::Messages::Message? 
    
    
  
  
  
  
  
  
  
  
  
    Cursor message for marking or nil if no marking. 
- 
  
    
      #marking_method  ⇒ Symbol 
    
    
  
  
  
  
  
  
  
  
  
    :mark_as_consumedor:mark_as_consumed!.
- 
  
    
      #timeout  ⇒ Integer? 
    
    
  
  
  
  
  
  
  
  
  
    Default timeout for pausing (if applicable) or nil if not. 
Constructor Details
#initialize ⇒ Base
Returns a new instance of Base.
| 23 24 25 26 | # File 'lib/karafka/pro/processing/filters/base.rb', line 23 def initialize @applied = false @cursor = nil end | 
Instance Attribute Details
#cursor ⇒ Karafka::Messages::Message? (readonly)
Returns the message that we want to use as a cursor one to pause or seek or nil if not applicable.
| 19 20 21 | # File 'lib/karafka/pro/processing/filters/base.rb', line 19 def cursor @cursor end | 
Instance Method Details
#action ⇒ Symbol
Returns filter post-execution action on consumer. Either :skip, :pause or
:seek.
| 36 37 38 | # File 'lib/karafka/pro/processing/filters/base.rb', line 36 def action :skip end | 
#applied? ⇒ Boolean
Returns did this filter change messages in any way.
| 41 42 43 | # File 'lib/karafka/pro/processing/filters/base.rb', line 41 def applied? @applied end | 
#apply!(messages) ⇒ Object
| 30 31 32 | # File 'lib/karafka/pro/processing/filters/base.rb', line 30 def apply!() raise NotImplementedError, 'Implement in a subclass' end | 
#mark_as_consumed? ⇒ Boolean
Returns should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any).
| 54 55 56 | # File 'lib/karafka/pro/processing/filters/base.rb', line 54 def mark_as_consumed? false end | 
#marking_cursor ⇒ Karafka::Messages::Message?
Returns cursor message for marking or nil if no marking.
| 66 67 68 | # File 'lib/karafka/pro/processing/filters/base.rb', line 66 def marking_cursor cursor end | 
#marking_method ⇒ Symbol
Returns :mark_as_consumed or :mark_as_consumed!. Applicable only if
marking is requested.
| 60 61 62 | # File 'lib/karafka/pro/processing/filters/base.rb', line 60 def marking_method :mark_as_consumed end | 
#timeout ⇒ Integer?
Please do not return 0 when your filter is not pausing as it may interact
with other filters that want to pause.
Returns default timeout for pausing (if applicable) or nil if not.
| 48 49 50 | # File 'lib/karafka/pro/processing/filters/base.rb', line 48 def timeout nil end |