Class: Karafka::Pro::ScheduledMessages::DailyBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/scheduled_messages/daily_buffer.rb

Overview

Stores schedules for the current day and gives back those that should be dispatched We do not use min-heap implementation and just a regular hash because we want to be able to update the schedules based on the key as well as remove the schedules in case it would be cancelled. While removals could be implemented, updates with different timestamp would be more complex. At the moment a lookup of 8 640 000 messages (100 per second) takes up to 1.5 second, thus it is acceptable. Please ping me if you encounter performance issues with this naive implementation so it can be improved.

Instance Method Summary collapse

Constructor Details

#initializeDailyBuffer

Returns a new instance of DailyBuffer.



25
26
27
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 25

def initialize
  @accu = {}
end

Instance Method Details

#<<(message) ⇒ Object

Note:

Only messages for a given day should be added here.

Adds message to the buffer or removes the message from the buffer if it is a tombstone message for a given key

Parameters:



40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 40

def <<(message)
  # Non schedule are only tombstones and cancellations
  schedule = message.headers['schedule_source_type'] == 'schedule'

  key = message.key

  if schedule
    epoch = message.headers['schedule_target_epoch']
    @accu[key] = [epoch, message]
  else
    @accu.delete(key)
  end
end

#delete(key) ⇒ Object

Removes given key from the accumulator

Parameters:

  • key (String)

    key to remove



73
74
75
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 73

def delete(key)
  @accu.delete(key)
end

#for_dispatch {|epoch| ... } ⇒ Object

Note:

We yield epoch alongside of the message so we do not have to extract it several times later on. This simplifies the API

Yields messages that should be dispatched (sent) to Kafka

Yield Parameters:



61
62
63
64
65
66
67
68
69
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 61

def for_dispatch
  dispatch = Time.now.to_i

  @accu.each_value do |epoch, message|
    next unless epoch <= dispatch

    yield(epoch, message)
  end
end

#sizeInteger

Returns number of elements to schedule today.

Returns:

  • (Integer)

    number of elements to schedule today



30
31
32
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 30

def size
  @accu.size
end