Class: Karafka::Web::Processing::TimeSeriesTracker

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/web/processing/time_series_tracker.rb

Overview

Note:

Please note we publish always absolute metrics and not deltas in reference to a given time window. This needs to be computed in the frontend as we want to have state facts in the storage.

Note:

Please note we evict and cleanup data only before we want to use it. This will put more stress on memory but makes this tracker 70-90% faster. Since by default we anyhow sample every few seconds, this trade-off makes sense.

Allows us to accumulate and track time series data with given resolution

We aggregate for last: - 7 days (every day) - 24 hours (every hour) - 1 hour (every minute) + the most recent as an update every time (leading)

Constant Summary collapse

TIME_RANGES =
Note:

We add one more than we want to display for delta computation when ranges

How many samples and in what resolution should we track for given time range are full in the UI

{
  # 7 days sampling
  days: {
    # Sample every 8 hours so we end up with 56 samples over a week + 1 for baseline
    resolution: 8 * 60 * 60,
    limit: 57
  }.freeze,
  # 24 hours sampling
  hours: {
    # Every 30 minutes for 24 hours + baseline
    resolution: 30 * 60,
    limit: 49
  }.freeze,
  # 60 minutes sampling
  minutes: {
    # Every one minute for an hour => 60 samples
    resolution: 60,
    limit: 61
  }.freeze,
  # 5 minutes sampling
  seconds: {
    # Every 5 seconds with 60 samples + baseline. That is 300 seconds => 5 minutes
    resolution: 5,
    limit: 61
  }.freeze
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(existing) ⇒ TimeSeriesTracker

Returns a new instance of TimeSeriesTracker.

Parameters:

  • existing (Hash)

    existing historical metrics (may be empty for the first state)



54
55
56
57
58
59
60
# File 'lib/karafka/web/processing/time_series_tracker.rb', line 54

def initialize(existing)
  # Builds an empty structure for potential time ranges we are interested in
  @historicals = TIME_RANGES.keys.map { |name| [name, []] }.to_h

  # Fetch the existing (if any) historical values that we already have
  import_existing(existing)
end

Instance Method Details

#add(current, state_time) ⇒ Object

Adds current state into the states for tracking

Parameters:

  • current (Hash)

    hash with current state

  • state_time (Float)

    float UTC time from which the state comes



65
66
67
68
# File 'lib/karafka/web/processing/time_series_tracker.rb', line 65

def add(current, state_time)
  # Inject the time point into all the historicals
  inject(current, state_time)
end

#to_hHash

Evicts expired and duplicated series and returns the cleaned hash

Returns:

  • (Hash)

    aggregated historicals hash



72
73
74
75
76
# File 'lib/karafka/web/processing/time_series_tracker.rb', line 72

def to_h
  evict

  @historicals
end