Class: Karafka::Pro::Processing::Coordinators::FiltersApplier

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/processing/coordinators/filters_applier.rb

Overview

Applier for all filters we want to have. Whether related to limiting messages based on the payload or any other things.

From the outside world perspective, this encapsulates all the filters. This means that this is the API we expose as a single filter, allowing us to control the filtering via many filters easily.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(coordinator) ⇒ FiltersApplier

Returns a new instance of FiltersApplier.

Parameters:

  • coordinator (Pro::Coordinator)

    pro coordinator



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 22

def initialize(coordinator)
  # Builds filters out of their factories
  # We build it that way (providing topic and partition) because there may be a case
  # where someone wants to have a specific logic that is per topic or partition. Like for
  # example a case where there is a cache bypassing revocations for topic partition.
  #
  # We provide full Karafka routing topic here and not the name only, in case the filter
  # would be customized based on other topic settings (like VPs, etc)
  #
  # This setup allows for biggest flexibility also because topic object holds the
  # reference to the subscription group and consumer group
  @filters = coordinator.topic.filtering.factories.map do |factory|
    factory.call(coordinator.topic, coordinator.partition)
  end
end

Instance Attribute Details

#filtersArray (readonly)

Returns registered filters array. Useful if we want to inject internal context aware filters.

Returns:

  • (Array)

    registered filters array. Useful if we want to inject internal context aware filters.



19
20
21
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 19

def filters
  @filters
end

Instance Method Details

#actionSymbol

Returns consumer post-filtering action that should be taken.

Returns:

  • (Symbol)

    consumer post-filtering action that should be taken



54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 54

def action
  return :skip unless applied?

  # The highest priority is on a potential backoff from any of the filters because it is
  # the less risky (delay and continue later)
  return :pause if applied.any? { |filter| filter.action == :pause }

  # If none of the filters wanted to pause, we can check for any that would want to seek
  # and if there is any, we can go with this strategy
  return :seek if applied.any? { |filter| filter.action == :seek }

  :skip
end

#applied?Boolean

Returns did we filter out any messages during filtering run.

Returns:

  • (Boolean)

    did we filter out any messages during filtering run



47
48
49
50
51
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 47

def applied?
  return false unless active?

  !applied.empty?
end

#apply!(messages) ⇒ Object

Parameters:



40
41
42
43
44
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 40

def apply!(messages)
  return unless active?

  @filters.each { |filter| filter.apply!(messages) }
end

#cursorKarafka::Messages::Message?

Note:

Cursor message can also return the offset in the time format

The first message we do need to get next time we poll. We use the minimum not to jump accidentally by over any.

Returns:



78
79
80
81
82
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 78

def cursor
  return nil unless active?

  applied.map(&:cursor).compact.min_by(&:offset)
end

#mark_as_consumed?Boolean

Returns did any of the filters requested offset storage during filter application.

Returns:

  • (Boolean)

    did any of the filters requested offset storage during filter application



86
87
88
89
90
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 86

def mark_as_consumed?
  # We can manage filtering offset only when user wanted that and there is a cursor
  # to use
  applied.any?(&:mark_as_consumed?) && cursor
end

#marking_cursorKarafka::Messages::Message?

Note:

It should not return position in time format, only numerical offset

The first (lowest) message we want to mark as consumed in marking. By default it uses same position as cursor in case user wants to mark same message as consumed as the one on which cursor action is applied.

Returns:



106
107
108
109
110
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 106

def marking_cursor
  return nil unless active?

  applied.map(&:marking_cursor).compact.min_by(&:offset)
end

#marking_methodSymbol

Returns :mark_as_consumed or :mark_as_consumed!.

Returns:

  • (Symbol)

    :mark_as_consumed or :mark_as_consumed!



93
94
95
96
97
98
99
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 93

def marking_method
  candidates = applied.map(&:marking_method)

  return :mark_as_consumed! if candidates.include?(:mark_as_consumed!)

  :mark_as_consumed
end

#timeoutInteger

Returns minimum timeout we need to pause. This is the minimum for all the filters to satisfy all of them.

Returns:

  • (Integer)

    minimum timeout we need to pause. This is the minimum for all the filters to satisfy all of them.



70
71
72
# File 'lib/karafka/pro/processing/coordinators/filters_applier.rb', line 70

def timeout
  applied.map(&:timeout).compact.min || 0
end