Class: Karafka::Pro::Processing::AdaptiveIterator::Tracker

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/pro/processing/adaptive_iterator/tracker.rb

Overview

Tracker is responsible for monitoring the processing of messages within the poll interval limitation. It ensures that the consumer does not exceed the maximum poll interval by tracking the processing cost and determining when to halt further processing (if needed).

Instance Method Summary collapse

Constructor Details

#initialize(safety_margin, last_polled_at, max_poll_interval_ms) ⇒ Tracker

Initializes a new Tracker instance.

Parameters:

  • safety_margin (Float)

    The safety margin percentage (0-100) to leave as a buffer.

  • last_polled_at (Float)

    The timestamp of the last polling in milliseconds.

  • max_poll_interval_ms (Integer)

    The maximum poll interval time in milliseconds.



22
23
24
25
26
27
28
29
30
31
# File 'lib/karafka/pro/processing/adaptive_iterator/tracker.rb', line 22

def initialize(
  safety_margin,
  last_polled_at,
  max_poll_interval_ms
)
  @safety_margin = safety_margin / 100.0 # Convert percentage to decimal
  @last_polled_at = last_polled_at
  @max_processing_cost = 0
  @max_poll_interval_ms = max_poll_interval_ms
end

Instance Method Details

#enough?Boolean

Determines if there is enough time left to process more messages without exceeding the maximum poll interval, considering both the safety margin and adaptive margin.

Returns:

  • (Boolean)

    Returns true if it is time to stop processing. False otherwise.



52
53
54
55
56
57
58
59
60
61
62
# File 'lib/karafka/pro/processing/adaptive_iterator/tracker.rb', line 52

def enough?
  elapsed_time_ms = monotonic_now - @last_polled_at
  remaining_time_ms = @max_poll_interval_ms - elapsed_time_ms

  safety_margin_ms = @max_poll_interval_ms * @safety_margin

  return true if remaining_time_ms <= safety_margin_ms
  return true if remaining_time_ms - @max_processing_cost <= safety_margin_ms

  false
end

#track { ... } ⇒ Object

Tracks the processing time of a block and updates the maximum processing cost.

Yields:

  • Executes the block, measuring the time taken for processing.



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/karafka/pro/processing/adaptive_iterator/tracker.rb', line 36

def track
  before = monotonic_now

  yield

  time_taken = monotonic_now - before

  return unless time_taken > @max_processing_cost

  @max_processing_cost = time_taken
end