Class: Karafka::Pro::Processing::Filters::Throttler

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/pro/processing/filters/throttler.rb

Overview

Throttler used to limit number of messages we can process in a given time interval The tricky thing is, that even if we throttle on 100 messages, if we’ve reached 100, we still need to indicate, that we throttle despite not receiving 101. Otherwise we will not pause the partition and will fetch more data that we should not process.

This is a special type of a filter that always throttles and makes us wait / seek if anything is applied out.

Instance Attribute Summary

Attributes inherited from Base

#cursor

Instance Method Summary collapse

Methods inherited from Base

#applied?, #mark_as_consumed?, #marking_method

Constructor Details

#initialize(limit, interval) ⇒ Throttler

Returns a new instance of Throttler.

Parameters:

  • limit (Integer)

    how many messages we can process in a given time

  • interval (Integer)

    interval in milliseconds for which we want to process



21
22
23
24
25
26
27
# File 'lib/karafka/pro/processing/filters/throttler.rb', line 21

def initialize(limit, interval)
  super()

  @limit = limit
  @interval = interval
  @requests = Hash.new { |h, k| h[k] = 0 }
end

Instance Method Details

#actionSymbol

Returns action to take upon throttler reaching certain state.

Returns:

  • (Symbol)

    action to take upon throttler reaching certain state



58
59
60
61
62
63
64
# File 'lib/karafka/pro/processing/filters/throttler.rb', line 58

def action
  if applied?
    timeout.zero? ? :seek : :pause
  else
    :skip
  end
end

#apply!(messages) ⇒ Object

Limits number of messages to a range that we can process (if needed) and keeps track of how many messages we’ve processed in a given time

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    limits the number of messages to number we can accept in the context of throttling constraints



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/karafka/pro/processing/filters/throttler.rb', line 33

def apply!(messages)
  @applied = false
  @cursor = nil
  @time = monotonic_now
  @requests.delete_if { |timestamp, _| timestamp < (@time - @interval) }
  values = @requests.values.sum
  accepted = 0

  messages.delete_if do |message|
    # +1 because of current
    @applied = (values + accepted + 1) > @limit

    @cursor = message if @applied && @cursor.nil?

    next true if @applied

    accepted += 1

    false
  end

  @requests[@time] += accepted
end

#timeoutInteger

Returns minimum number of milliseconds to wait before getting more messages so we are no longer throttled and so we can process at least one message.

Returns:

  • (Integer)

    minimum number of milliseconds to wait before getting more messages so we are no longer throttled and so we can process at least one message



68
69
70
71
# File 'lib/karafka/pro/processing/filters/throttler.rb', line 68

def timeout
  timeout = @interval - (monotonic_now - @time)
  timeout <= 0 ? 0 : timeout
end