Class: Karafka::Web::Processing::Consumers::Aggregators::Metrics

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/web/processing/consumers/aggregators/metrics.rb

Overview

Aggregates metrics for metrics topic. Tracks consumers data and converts it into a state that can then be used to enrich previous time based states to get a time-series values for charts and metrics

Constant Summary collapse

SCHEMA_VERSION =

Current schema version This is used for detecting incompatible changes and writing migrations

'1.3.0'

Instance Method Summary collapse

Methods inherited from Base

#add

Constructor Details

#initializeMetrics

Returns a new instance of Metrics.



16
17
18
19
20
# File 'lib/karafka/web/processing/consumers/aggregators/metrics.rb', line 16

def initialize
  super
  @aggregated_tracker = TimeSeriesTracker.new(metrics.fetch(:aggregated))
  @consumer_groups_tracker = TimeSeriesTracker.new(metrics.fetch(:consumer_groups))
end

Instance Method Details

#add_report(report) ⇒ Object

Adds the current report to active reports and removes old once

Parameters:

  • report (Hash)

    single process full report



25
26
27
28
29
# File 'lib/karafka/web/processing/consumers/aggregators/metrics.rb', line 25

def add_report(report)
  add(report)
  evict_expired_processes
  add_consumers_groups_metrics
end

#add_stats(stats) ⇒ Object

Updates the aggregated stats metrics

Parameters:

  • stats (Hash)

    aggregated statistics



34
35
36
37
38
39
# File 'lib/karafka/web/processing/consumers/aggregators/metrics.rb', line 34

def add_stats(stats)
  metrics[:aggregated] = @aggregated_tracker.add(
    stats,
    @aggregated_from
  )
end

#to_hHash

Note:

We materialize the consumers groups time series only here and not in real time, because we materialize it based on the tracked active collective state. Materializing on each update that would not be dispatched would be pointless.

Converts our current knowledge into a report hash.

Returns:

  • (Hash)

    Statistics hash



48
49
50
51
52
53
54
55
# File 'lib/karafka/web/processing/consumers/aggregators/metrics.rb', line 48

def to_h
  metrics[:schema_version] = SCHEMA_VERSION
  metrics[:dispatched_at] = float_now
  metrics[:aggregated] = @aggregated_tracker.to_h
  metrics[:consumer_groups] = @consumer_groups_tracker.to_h

  metrics
end