Class: Karafka::Web::Processing::Consumers::Aggregators::State
- Defined in:
- lib/karafka/web/processing/consumers/aggregators/state.rb
Overview
Aggregator that tracks consumers processes states, aggregates the metrics and converts data points into a materialized current state.
There are two types of metrics: - totals - metrics that represent absolute values like number of messages processed in total. Things that need to be incremented/updated with each incoming consumer process report. They cannot be “batch computed” because they do not represent a a state of time but progress. - aggregated state - a state that represents a “snapshot” of things happening right now. Right now is the moment of time on which we operate.
Constant Summary collapse
- SCHEMA_VERSION =
Current schema version This can be used in the future for detecting incompatible changes and writing migrations
'1.4.0'
Instance Method Summary collapse
-
#add(report, offset) ⇒ Object
Uses provided process state report to update the current materialized state.
-
#initialize(schema_manager) ⇒ State
constructor
A new instance of State.
-
#stats ⇒ Array<Hash, Float>
Aggregated current stats value and time from which this aggregation comes from.
-
#to_h(*_args) ⇒ Hash
Sets the dispatch time and returns the hash that can be shipped to the states topic.
Constructor Details
#initialize(schema_manager) ⇒ State
Returns a new instance of State.
27 28 29 30 |
# File 'lib/karafka/web/processing/consumers/aggregators/state.rb', line 27 def initialize(schema_manager) super() @schema_manager = schema_manager end |
Instance Method Details
#add(report, offset) ⇒ Object
Uses provided process state report to update the current materialized state
36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/karafka/web/processing/consumers/aggregators/state.rb', line 36 def add(report, offset) super(report) increment_total_counters(report) update_process_state(report, offset) # We always evict after counters updates because we want to use expired (stopped) # data for counters as it was valid previously. This can happen only when web consumer # had a lag and is catching up. evict_expired_processes # current means current in the context of processing window (usually now but in case # of lag, this state may be from the past) refresh_current_stats end |
#stats ⇒ Array<Hash, Float>
We return a copy, because we use the internal one to track state changes and unless we would return a copy, other aggregators could have this mutated in an unexpected way
Returns aggregated current stats value and time from which this aggregation comes from.
55 56 57 |
# File 'lib/karafka/web/processing/consumers/aggregators/state.rb', line 55 def stats state.fetch(:stats).dup end |
#to_h(*_args) ⇒ Hash
Sets the dispatch time and returns the hash that can be shipped to the states topic
63 64 65 66 67 68 69 |
# File 'lib/karafka/web/processing/consumers/aggregators/state.rb', line 63 def to_h(*_args) state[:schema_version] = SCHEMA_VERSION state[:dispatched_at] = float_now state[:schema_state] = @schema_manager.to_s state end |