Class: Karafka::Pro::ScheduledMessages::Tracker

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/scheduled_messages/tracker.rb

Overview

Tracks basic state and metrics about schedules to be dispatched

It provides accurate today dispatch taken from daily buffer and estimates for future days

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTracker

Initializes the tracker with empty statistics



22
23
24
25
26
27
28
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 22

def initialize
  @daily = Hash.new { |h, k| h[k] = 0 }
  @started_at = Time.now.to_i
  @offsets = { low: -1, high: -1 }
  @state = 'fresh'
  @reloads = 0
end

Instance Attribute Details

#reloads=(value) ⇒ Object (writeonly)

Sets the attribute reloads

Parameters:

  • value

    the value to set the attribute reloads to.



16
17
18
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 16

def reloads=(value)
  @reloads = value
end

#started_atInteger (readonly)

Returns time epoch when this tracker was started.

Returns:

  • (Integer)

    time epoch when this tracker was started



19
20
21
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 19

def started_at
  @started_at
end

#stateString

Returns current state.

Returns:

  • (String)

    current state



14
15
16
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 14

def state
  @state
end

Instance Method Details

#future(message) ⇒ Object

Tracks future message dispatch

It is only relevant for future days as for today we use accurate metrics from the daily buffer

Parameters:

  • message (Karafka::Messages::Message)

    schedule message. Should not be a tombstone message. Tombstone messages cancellations are not tracked because it would drastically increase complexity. For given day we use the accurate counter and for future days we use estimates.



56
57
58
59
60
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 56

def future(message)
  epoch = message.headers['schedule_target_epoch']

  @daily[epoch_to_date(epoch)] += 1
end

#offsets(message) ⇒ Object

Tracks offsets of visited messages

Parameters:



33
34
35
36
37
38
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 33

def offsets(message)
  message_offset = message.offset

  @offsets[:low] = message_offset if @offsets[:low].negative?
  @offsets[:high] = message.offset
end

#to_hHash

Returns hash with details that we want to expose.

Returns:

  • (Hash)

    hash with details that we want to expose



63
64
65
66
67
68
69
70
71
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 63

def to_h
  {
    state: @state,
    offsets: @offsets,
    daily: @daily,
    started_at: @started_at,
    reloads: @reloads
  }.freeze
end

#today=(sum) ⇒ Object

Accurate (because coming from daily buffer) number of things to schedule daily

Parameters:

  • sum (Integer)


43
44
45
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 43

def today=(sum)
  @daily[epoch_to_date(@started_at)] = sum
end