Class: Karafka::Web::Processing::Consumers::Aggregator

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/web/processing/consumers/aggregator.rb

Overview

Aggregator that tracks consumers processes states, aggregates the metrics and converts data points into a materialized current state.

Instance Method Summary collapse

Constructor Details

#initializeAggregator

Returns a new instance of Aggregator.



13
14
15
16
# File 'lib/karafka/web/processing/consumers/aggregator.rb', line 13

def initialize
  # We keep whole reports for computation of active, current counters
  @active_reports = {}
end

Instance Method Details

#add(report, offset) ⇒ Object

Uses provided process state report to update the current materialized state

Parameters:

  • report (Hash)

    consumer process state report

  • offset (Integer)

    offset of the message with the state report. This offset is needed as we need to be able to get all the consumers reports from a given offset.



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/karafka/web/processing/consumers/aggregator.rb', line 22

def add(report, offset)
  memoize_process_report(report)
  increment_total_counters(report)
  update_process_state(report, offset)
  # We always evict after counters updates because we want to use expired (stopped)
  # data for counters as it was valid previously. This can happen only when web consumer
  # had a lag and is catching up.
  evict_expired_processes
  # We could calculate this on a per request basis but this would require fetching all
  # the active processes for each view and we do not want that for performance reasons
  refresh_current_stats
end

#to_json(*_args) ⇒ String

Returns json representation of the current processes state.

Parameters:

  • _args (Object)

    extra parsing arguments (not used)

Returns:

  • (String)

    json representation of the current processes state



37
38
39
# File 'lib/karafka/web/processing/consumers/aggregator.rb', line 37

def to_json(*_args)
  state.to_json
end