Class: Karafka::Pro::Processing::Filters::InlineInsightsDelayer

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/pro/processing/filters/inline_insights_delayer.rb

Overview

Delayer that checks if we have appropriate insights available. If not, pauses for 5 seconds so the insights can be loaded from the broker.

In case it would take more than five seconds to load insights, it will just pause again

This filter ensures, that we always have inline insights that a consumer can use

It is relevant in most cases only during the process start, when first poll may not yield statistics yet but will give some data.

Instance Attribute Summary

Attributes inherited from Base

#cursor

Instance Method Summary collapse

Methods inherited from Base

#applied?, #mark_as_consumed?, #marking_method

Constructor Details

#initialize(topic, partition) ⇒ InlineInsightsDelayer

Returns a new instance of InlineInsightsDelayer.

Parameters:



35
36
37
38
39
# File 'lib/karafka/pro/processing/filters/inline_insights_delayer.rb', line 35

def initialize(topic, partition)
  super()
  @topic = topic
  @partition = partition
end

Instance Method Details

#actionObject

Pause when we had to back-off or skip if delay is not needed



71
72
73
# File 'lib/karafka/pro/processing/filters/inline_insights_delayer.rb', line 71

def action
  applied? ? :pause : :skip
end

#apply!(messages) ⇒ Object

Pauses if inline insights would not be available. Does nothing otherwise

Parameters:



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/karafka/pro/processing/filters/inline_insights_delayer.rb', line 44

def apply!(messages)
  @applied = false
  @cursor = messages.first

  # Nothing to do if there were no messages
  # This can happen when we chain filters
  return unless @cursor

  insights = ::Karafka::Processing::InlineInsights::Tracker.find(
    @topic,
    @partition
  )

  # If insights are available, also nothing to do here and we can just process
  return unless insights.empty?

  messages.clear

  @applied = true
end

#timeoutInteger

Returns ms timeout in case of pause.

Returns:

  • (Integer)

    ms timeout in case of pause



66
67
68
# File 'lib/karafka/pro/processing/filters/inline_insights_delayer.rb', line 66

def timeout
  @cursor && applied? ? PAUSE_TIMEOUT : 0
end