Class: Karafka::Web::Processing::Consumers::Aggregators::Metrics
- Defined in:
- lib/karafka/web/processing/consumers/aggregators/metrics.rb
Overview
Aggregates metrics for metrics topic. Tracks consumers data and converts it into a state that can then be used to enrich previous time based states to get a time-series values for charts and metrics
Constant Summary collapse
- SCHEMA_VERSION =
Current schema version This is used for detecting incompatible changes and writing migrations
'1.3.0'
Instance Method Summary collapse
-
#add_report(report) ⇒ Object
Adds the current report to active reports and removes old once.
-
#add_stats(stats) ⇒ Object
Updates the aggregated stats metrics.
-
#initialize ⇒ Metrics
constructor
A new instance of Metrics.
-
#to_h ⇒ Hash
Converts our current knowledge into a report hash.
Methods inherited from Base
Constructor Details
#initialize ⇒ Metrics
Returns a new instance of Metrics.
16 17 18 19 20 |
# File 'lib/karafka/web/processing/consumers/aggregators/metrics.rb', line 16 def initialize super @aggregated_tracker = TimeSeriesTracker.new(metrics.fetch(:aggregated)) @consumer_groups_tracker = TimeSeriesTracker.new(metrics.fetch(:consumer_groups)) end |
Instance Method Details
#add_report(report) ⇒ Object
Adds the current report to active reports and removes old once
25 26 27 28 29 |
# File 'lib/karafka/web/processing/consumers/aggregators/metrics.rb', line 25 def add_report(report) add(report) evict_expired_processes add_consumers_groups_metrics end |
#add_stats(stats) ⇒ Object
Updates the aggregated stats metrics
34 35 36 37 38 39 |
# File 'lib/karafka/web/processing/consumers/aggregators/metrics.rb', line 34 def add_stats(stats) metrics[:aggregated] = @aggregated_tracker.add( stats, @aggregated_from ) end |
#to_h ⇒ Hash
We materialize the consumers groups time series only here and not in real time, because we materialize it based on the tracked active collective state. Materializing on each update that would not be dispatched would be pointless.
Converts our current knowledge into a report hash.
48 49 50 51 52 53 54 55 |
# File 'lib/karafka/web/processing/consumers/aggregators/metrics.rb', line 48 def to_h metrics[:schema_version] = SCHEMA_VERSION metrics[:dispatched_at] = float_now metrics[:aggregated] = @aggregated_tracker.to_h metrics[:consumer_groups] = @consumer_groups_tracker.to_h metrics end |