Class: Karafka::Pro::Processing::Coordinators::FiltersApplier
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::Coordinators::FiltersApplier
- Defined in:
- lib/karafka/pro/processing/coordinators/filters_applier.rb
Overview
Applier for all filters we want to have. Whether related to limiting messages based on the payload or any other things.
From the outside world perspective, this encapsulates all the filters. This means that this is the API we expose as a single filter, allowing us to control the filtering via many filters easily.
Instance Attribute Summary collapse
-
#filters ⇒ Array
readonly
Registered filters array.
Instance Method Summary collapse
-
#action ⇒ Symbol
Consumer post-filtering action that should be taken.
-
#applied? ⇒ Boolean
Did we filter out any messages during filtering run.
- #apply!(messages) ⇒ Object
-
#cursor ⇒ Karafka::Messages::Message?
The first message we do need to get next time we poll.
-
#initialize(coordinator) ⇒ FiltersApplier
constructor
A new instance of FiltersApplier.
-
#mark_as_consumed? ⇒ Boolean
Did any of the filters requested offset storage during filter application.
-
#marking_cursor ⇒ Karafka::Messages::Message?
The first (lowest) message we want to mark as consumed in marking.
-
#marking_method ⇒ Symbol
:mark_as_consumed
or:mark_as_consumed!
. -
#timeout ⇒ Integer
Minimum timeout we need to pause.
Constructor Details
#initialize(coordinator) ⇒ FiltersApplier
Returns a new instance of FiltersApplier.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 22 def initialize(coordinator) # Builds filters out of their factories # We build it that way (providing topic and partition) because there may be a case # where someone wants to have a specific logic that is per topic or partition. Like for # example a case where there is a cache bypassing revocations for topic partition. # # We provide full Karafka routing topic here and not the name only, in case the filter # would be customized based on other topic settings (like VPs, etc) # # This setup allows for biggest flexibility also because topic object holds the # reference to the subscription group and consumer group @filters = coordinator.topic.filtering.factories.map do |factory| factory.call(coordinator.topic, coordinator.partition) end end |
Instance Attribute Details
#filters ⇒ Array (readonly)
Returns registered filters array. Useful if we want to inject internal context aware filters.
19 20 21 |
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 19 def filters @filters end |
Instance Method Details
#action ⇒ Symbol
Returns consumer post-filtering action that should be taken.
54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 54 def action return :skip unless applied? # The highest priority is on a potential backoff from any of the filters because it is # the less risky (delay and continue later) return :pause if applied.any? { |filter| filter.action == :pause } # If none of the filters wanted to pause, we can check for any that would want to seek # and if there is any, we can go with this strategy return :seek if applied.any? { |filter| filter.action == :seek } :skip end |
#applied? ⇒ Boolean
Returns did we filter out any messages during filtering run.
47 48 49 50 51 |
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 47 def applied? return false unless active? !applied.empty? end |
#apply!(messages) ⇒ Object
40 41 42 43 44 |
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 40 def apply!() return unless active? @filters.each { |filter| filter.apply!() } end |
#cursor ⇒ Karafka::Messages::Message?
Cursor message can also return the offset in the time format
The first message we do need to get next time we poll. We use the minimum not to jump accidentally by over any.
78 79 80 81 82 |
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 78 def cursor return nil unless active? applied.map(&:cursor).compact.min_by(&:offset) end |
#mark_as_consumed? ⇒ Boolean
Returns did any of the filters requested offset storage during filter application.
86 87 88 89 90 |
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 86 def mark_as_consumed? # We can manage filtering offset only when user wanted that and there is a cursor # to use applied.any?(&:mark_as_consumed?) && cursor end |
#marking_cursor ⇒ Karafka::Messages::Message?
It should not return position in time format, only numerical offset
The first (lowest) message we want to mark as consumed in marking. By default it uses same position as cursor in case user wants to mark same message as consumed as the one on which cursor action is applied.
106 107 108 109 110 |
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 106 def marking_cursor return nil unless active? applied.map(&:marking_cursor).compact.min_by(&:offset) end |
#marking_method ⇒ Symbol
Returns :mark_as_consumed
or :mark_as_consumed!
.
93 94 95 96 97 98 99 |
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 93 def marking_method candidates = applied.map(&:marking_method) return :mark_as_consumed! if candidates.include?(:mark_as_consumed!) :mark_as_consumed end |
#timeout ⇒ Integer
Returns minimum timeout we need to pause. This is the minimum for all the filters to satisfy all of them.
70 71 72 |
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 70 def timeout applied.map(&:timeout).compact.min || 0 end |