Class: Karafka::Pro::ScheduledMessages::Tracker

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/scheduled_messages/tracker.rb

Overview

Tracks basic state and metrics about schedules to be dispatched

It provides accurate today dispatch taken from daily buffer and estimates for future days

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTracker

Returns a new instance of Tracker.



21
22
23
24
25
26
27
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 21

def initialize
  @daily = Hash.new { |h, k| h[k] = 0 }
  @started_at = Time.now.to_i
  @offsets = { low: -1, high: -1 }
  @state = 'fresh'
  @reloads = 0
end

Instance Attribute Details

#reloads=(value) ⇒ Object (writeonly)

Sets the attribute reloads

Parameters:

  • value

    the value to set the attribute reloads to.



16
17
18
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 16

def reloads=(value)
  @reloads = value
end

#started_atInteger (readonly)

Returns time epoch when this tracker was started.

Returns:

  • (Integer)

    time epoch when this tracker was started



19
20
21
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 19

def started_at
  @started_at
end

#stateString

Returns current state.

Returns:

  • (String)

    current state



14
15
16
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 14

def state
  @state
end

Instance Method Details

#future(message) ⇒ Object

Tracks future message dispatch

It is only relevant for future days as for today we use accurate metrics from the daily buffer

Parameters:

  • message (Karafka::Messages::Message)

    schedule message. Should not be a tombstone message. Tombstone messages cancellations are not tracked because it would drastically increase complexity. For given day we use the accurate counter and for future days we use estimates.



55
56
57
58
59
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 55

def future(message)
  epoch = message.headers['schedule_target_epoch']

  @daily[epoch_to_date(epoch)] += 1
end

#offsets(message) ⇒ Object

Tracks offsets of visited messages

Parameters:



32
33
34
35
36
37
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 32

def offsets(message)
  message_offset = message.offset

  @offsets[:low] = message_offset if @offsets[:low].negative?
  @offsets[:high] = message.offset
end

#to_hHash

Returns hash with details that we want to expose.

Returns:

  • (Hash)

    hash with details that we want to expose



62
63
64
65
66
67
68
69
70
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 62

def to_h
  {
    state: @state,
    offsets: @offsets,
    daily: @daily,
    started_at: @started_at,
    reloads: @reloads
  }.freeze
end

#today=(sum) ⇒ Object

Accurate (because coming from daily buffer) number of things to schedule daily

Parameters:

  • sum (Integer)


42
43
44
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 42

def today=(sum)
  @daily[epoch_to_date(@started_at)] = sum
end