Class: Karafka::Web::Processing::Consumers::Aggregator
- Inherits:
-
Object
- Object
- Karafka::Web::Processing::Consumers::Aggregator
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/web/processing/consumers/aggregator.rb
Overview
Aggregator that tracks consumers processes states, aggregates the metrics and converts data points into a materialized current state.
Instance Method Summary collapse
-
#add(report, offset) ⇒ Object
Uses provided process state report to update the current materialized state.
-
#initialize ⇒ Aggregator
constructor
A new instance of Aggregator.
-
#to_json(*_args) ⇒ String
Json representation of the current processes state.
Constructor Details
#initialize ⇒ Aggregator
Returns a new instance of Aggregator.
13 14 15 16 |
# File 'lib/karafka/web/processing/consumers/aggregator.rb', line 13 def initialize # We keep whole reports for computation of active, current counters @active_reports = {} end |
Instance Method Details
#add(report, offset) ⇒ Object
Uses provided process state report to update the current materialized state
22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/karafka/web/processing/consumers/aggregator.rb', line 22 def add(report, offset) memoize_process_report(report) increment_total_counters(report) update_process_state(report, offset) # We always evict after counters updates because we want to use expired (stopped) # data for counters as it was valid previously. This can happen only when web consumer # had a lag and is catching up. evict_expired_processes # We could calculate this on a per request basis but this would require fetching all # the active processes for each view and we do not want that for performance reasons refresh_current_stats end |
#to_json(*_args) ⇒ String
Returns json representation of the current processes state.
37 38 39 |
# File 'lib/karafka/web/processing/consumers/aggregator.rb', line 37 def to_json(*_args) state.to_json end |