Class: Karafka::Web::Processing::Consumers::Aggregators::Base

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/web/processing/consumers/aggregators/base.rb

Overview

Note:

It is important to understand, that we operate here on a moment in time and this moment may not mean “current” now. There might have been a lag and we may be catching up on older states. This is why we use @aggregated_from time instead of the real now. In case of a lag, we want to aggregate and catch up with data, without assigning it to the time of processing but aligning it with the time from which the given reports came. This allows us to compensate for the potential lag related to rebalances, downtimes, failures, etc.

Base for all the consumer related aggregators that operate on processes reports

Direct Known Subclasses

Metrics, State

Instance Method Summary collapse

Constructor Details

#initializeBase

Returns a new instance of Base.



22
23
24
# File 'lib/karafka/web/processing/consumers/aggregators/base.rb', line 22

def initialize
  @active_reports = {}
end

Instance Method Details

#add(report) ⇒ Object

Adds report to the internal active reports hash and updates the aggregation time for internal time reference usage

Parameters:

  • report (Hash)

    incoming process state report



29
30
31
32
# File 'lib/karafka/web/processing/consumers/aggregators/base.rb', line 29

def add(report)
  memoize_process_report(report)
  update_aggregated_from
end