Class: Karafka::Pro::ScheduledMessages::Tracker
- Inherits:
-
Object
- Object
- Karafka::Pro::ScheduledMessages::Tracker
- Defined in:
- lib/karafka/pro/scheduled_messages/tracker.rb
Overview
Tracks basic state and metrics about schedules to be dispatched
It provides accurate today dispatch taken from daily buffer and estimates for future days
Instance Attribute Summary collapse
-
#reloads ⇒ Object
writeonly
Sets the attribute reloads.
-
#started_at ⇒ Integer
readonly
Time epoch when this tracker was started.
-
#state ⇒ String
Current state.
Instance Method Summary collapse
-
#future(message) ⇒ Object
Tracks future message dispatch.
-
#initialize ⇒ Tracker
constructor
A new instance of Tracker.
-
#offsets(message) ⇒ Object
Tracks offsets of visited messages.
-
#to_h ⇒ Hash
Hash with details that we want to expose.
-
#today=(sum) ⇒ Object
Accurate (because coming from daily buffer) number of things to schedule daily.
Constructor Details
#initialize ⇒ Tracker
Returns a new instance of Tracker.
21 22 23 24 25 26 27 |
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 21 def initialize @daily = Hash.new { |h, k| h[k] = 0 } @started_at = Time.now.to_i @offsets = { low: -1, high: -1 } @state = 'fresh' @reloads = 0 end |
Instance Attribute Details
#reloads=(value) ⇒ Object (writeonly)
Sets the attribute reloads
16 17 18 |
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 16 def reloads=(value) @reloads = value end |
#started_at ⇒ Integer (readonly)
Returns time epoch when this tracker was started.
19 20 21 |
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 19 def started_at @started_at end |
#state ⇒ String
Returns current state.
14 15 16 |
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 14 def state @state end |
Instance Method Details
#future(message) ⇒ Object
Tracks future message dispatch
It is only relevant for future days as for today we use accurate metrics from the daily buffer
55 56 57 58 59 |
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 55 def future() epoch = .headers['schedule_target_epoch'] @daily[epoch_to_date(epoch)] += 1 end |
#offsets(message) ⇒ Object
Tracks offsets of visited messages
32 33 34 35 36 37 |
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 32 def offsets() = .offset @offsets[:low] = if @offsets[:low].negative? @offsets[:high] = .offset end |
#to_h ⇒ Hash
Returns hash with details that we want to expose.
62 63 64 65 66 67 68 69 70 |
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 62 def to_h { state: @state, offsets: @offsets, daily: @daily, started_at: @started_at, reloads: @reloads }.freeze end |
#today=(sum) ⇒ Object
Accurate (because coming from daily buffer) number of things to schedule daily
42 43 44 |
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 42 def today=(sum) @daily[epoch_to_date(@started_at)] = sum end |