Class: Karafka::Web::Processing::Consumers::Aggregators::Base
- Inherits:
-
Object
- Object
- Karafka::Web::Processing::Consumers::Aggregators::Base
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/web/processing/consumers/aggregators/base.rb
Overview
It is important to understand, that we operate here on a moment in time and this moment may not mean “current” now. There might have been a lag and we may be catching up on older states. This is why we use @aggregated_from
time instead of the real now. In case of a lag, we want to aggregate and catch up with data, without assigning it to the time of processing but aligning it with the time from which the given reports came. This allows us to compensate for the potential lag related to rebalances, downtimes, failures, etc.
Base for all the consumer related aggregators that operate on processes reports
Instance Method Summary collapse
-
#add(report) ⇒ Object
Adds report to the internal active reports hash and updates the aggregation time for internal time reference usage.
-
#initialize ⇒ Base
constructor
A new instance of Base.
Constructor Details
#initialize ⇒ Base
Returns a new instance of Base.
22 23 24 |
# File 'lib/karafka/web/processing/consumers/aggregators/base.rb', line 22 def initialize @active_reports = {} end |
Instance Method Details
#add(report) ⇒ Object
Adds report to the internal active reports hash and updates the aggregation time for internal time reference usage
29 30 31 32 |
# File 'lib/karafka/web/processing/consumers/aggregators/base.rb', line 29 def add(report) memoize_process_report(report) update_aggregated_from end |