Class: Karafka::Pro::ScheduledMessages::DailyBuffer
- Inherits:
-
Object
- Object
- Karafka::Pro::ScheduledMessages::DailyBuffer
- Defined in:
- lib/karafka/pro/scheduled_messages/daily_buffer.rb
Overview
Stores schedules for the current day and gives back those that should be dispatched We do not use min-heap implementation and just a regular hash because we want to be able to update the schedules based on the key as well as remove the schedules in case it would be cancelled. While removals could be implemented, updates with different timestamp would be more complex. At the moment a lookup of 8 640 000 messages (100 per second) takes up to 1.5 second, thus it is acceptable. Please ping me if you encounter performance issues with this naive implementation so it can be improved.
Instance Method Summary collapse
-
#<<(message) ⇒ Object
Adds message to the buffer or removes the message from the buffer if it is a tombstone message for a given key.
-
#delete(key) ⇒ Object
Removes given key from the accumulator.
-
#for_dispatch {|epoch| ... } ⇒ Object
Yields messages that should be dispatched (sent) to Kafka.
-
#initialize ⇒ DailyBuffer
constructor
A new instance of DailyBuffer.
-
#size ⇒ Integer
Number of elements to schedule today.
Constructor Details
#initialize ⇒ DailyBuffer
Returns a new instance of DailyBuffer.
17 18 19 |
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 17 def initialize @accu = {} end |
Instance Method Details
#<<(message) ⇒ Object
Only messages for a given day should be added here.
Adds message to the buffer or removes the message from the buffer if it is a tombstone message for a given key
32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 32 def <<() # Non schedule are only tombstones and cancellations schedule = .headers['schedule_source_type'] == 'schedule' key = .key if schedule epoch = .headers['schedule_target_epoch'] @accu[key] = [epoch, ] else @accu.delete(key) end end |
#delete(key) ⇒ Object
Removes given key from the accumulator
65 66 67 |
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 65 def delete(key) @accu.delete(key) end |
#for_dispatch {|epoch| ... } ⇒ Object
We yield epoch alongside of the message so we do not have to extract it several times later on. This simplifies the API
Yields messages that should be dispatched (sent) to Kafka
53 54 55 56 57 58 59 60 61 |
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 53 def for_dispatch dispatch = Time.now.to_i @accu.each_value do |epoch, | next unless epoch <= dispatch yield(epoch, ) end end |
#size ⇒ Integer
Returns number of elements to schedule today.
22 23 24 |
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 22 def size @accu.size end |