Class: Karafka::Web::Processing::Consumers::Aggregators::State

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/web/processing/consumers/aggregators/state.rb

Overview

Aggregator that tracks consumers processes states, aggregates the metrics and converts data points into a materialized current state.

There are two types of metrics: - totals - metrics that represent absolute values like number of messages processed in total. Things that need to be incremented/updated with each incoming consumer process report. They cannot be “batch computed” because they do not represent a a state of time but progress. - aggregated state - a state that represents a “snapshot” of things happening right now. Right now is the moment of time on which we operate.

Constant Summary collapse

SCHEMA_VERSION =

Current schema version This can be used in the future for detecting incompatible changes and writing migrations

'1.4.0'

Instance Method Summary collapse

Constructor Details

#initialize(schema_manager) ⇒ State

Returns a new instance of State.

Parameters:



27
28
29
30
# File 'lib/karafka/web/processing/consumers/aggregators/state.rb', line 27

def initialize(schema_manager)
  super()
  @schema_manager = schema_manager
end

Instance Method Details

#add(report, offset) ⇒ Object

Uses provided process state report to update the current materialized state

Parameters:

  • report (Hash)

    consumer process state report

  • offset (Integer)

    offset of the message with the state report. This offset is needed as we need to be able to get all the consumers reports from a given offset.



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/karafka/web/processing/consumers/aggregators/state.rb', line 36

def add(report, offset)
  super(report)
  increment_total_counters(report)
  update_process_state(report, offset)
  # We always evict after counters updates because we want to use expired (stopped)
  # data for counters as it was valid previously. This can happen only when web consumer
  # had a lag and is catching up.
  evict_expired_processes
  # current means current in the context of processing window (usually now but in case
  # of lag, this state may be from the past)
  refresh_current_stats
end

#statsArray<Hash, Float>

Note:

We return a copy, because we use the internal one to track state changes and unless we would return a copy, other aggregators could have this mutated in an unexpected way

Returns aggregated current stats value and time from which this aggregation comes from.

Returns:

  • (Array<Hash, Float>)

    aggregated current stats value and time from which this aggregation comes from



55
56
57
# File 'lib/karafka/web/processing/consumers/aggregators/state.rb', line 55

def stats
  state.fetch(:stats).dup
end

#to_h(*_args) ⇒ Hash

Sets the dispatch time and returns the hash that can be shipped to the states topic

Parameters:

  • _args (Object)

    extra parsing arguments (not used)

Returns:

  • (Hash)

    Hash that we can use to ship states data to Kafka



63
64
65
66
67
68
69
# File 'lib/karafka/web/processing/consumers/aggregators/state.rb', line 63

def to_h(*_args)
  state[:schema_version] = SCHEMA_VERSION
  state[:dispatched_at] = float_now
  state[:schema_state] = @schema_manager.to_s

  state
end