Class: Karafka::Connection::RawMessagesBuffer
- Inherits:
-
Object
- Object
- Karafka::Connection::RawMessagesBuffer
- Includes:
- Karafka::Core::Helpers::Time
- Defined in:
- lib/karafka/connection/raw_messages_buffer.rb
Overview
This buffer is NOT threadsafe.
We store data here in groups per topic partition to handle the revocation case, where we may need to remove messages from a single topic partition.
Buffer for raw librdkafka messages and eof status.
When message is added to this buffer, it gets assigned to an array with other messages from the same topic and partition.
Instance Attribute Summary collapse
-
#last_polled_at ⇒ Float
readonly
Last polling time in milliseconds (monotonic).
-
#size ⇒ Object
readonly
Returns the value of attribute size.
Instance Method Summary collapse
-
#<<(message) ⇒ Array<Rdkafka::Consumer::Message>
Adds a message to the buffer.
-
#clear ⇒ Object
Removes all the data from the buffer.
-
#delete(topic, partition) ⇒ Object
Removes given topic and partition data out of the buffer This is used when there’s a partition revocation.
-
#each {|topic, partition, topic, has| ... } ⇒ Object
Allows to iterate over all the topics and partitions messages.
-
#eof(topic, partition) ⇒ Object
Marks given topic partition as one that reached eof.
-
#initialize ⇒ Karafka::Connection::MessagesBuffer
constructor
Buffer instance.
-
#polled ⇒ Object
Marks the last polling time that can be accessed via
#last_polled_at
. -
#uniq! ⇒ Object
Removes duplicated messages from the same partitions This should be used only when rebalance occurs, as we may get data again we already have due to the processing from the last offset.
Constructor Details
#initialize ⇒ Karafka::Connection::MessagesBuffer
Returns buffer instance.
23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 23 def initialize @size = 0 @last_polled_at = monotonic_now @groups = Hash.new do |topic_groups, topic| topic_groups[topic] = Hash.new do |partition_groups, partition| partition_groups[partition] = { eof: false, messages: [] } end end end |
Instance Attribute Details
#last_polled_at ⇒ Float (readonly)
Returns last polling time in milliseconds (monotonic).
20 21 22 |
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 20 def last_polled_at @last_polled_at end |
#size ⇒ Object (readonly)
Returns the value of attribute size.
17 18 19 |
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 17 def size @size end |
Instance Method Details
#<<(message) ⇒ Array<Rdkafka::Consumer::Message>
Adds a message to the buffer.
41 42 43 44 45 46 |
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 41 def <<() @size += 1 partition_state = @groups[.topic][.partition] partition_state[:messages] << partition_state[:eof] = false end |
#clear ⇒ Object
We do not clear the whole groups hash but rather we clear the partition hashes, so we save ourselves some objects allocations. We cannot clear the underlying arrays as they may be used in other threads for data processing, thus if we would clear it, we could potentially clear a raw messages array for a job that is in the jobs queue.
We do not clear the eof assignments because they can span across batch pollings. Since eof is not raised non-stop and is silenced after an eof poll, if we would clean it here we would loose the notion of it. The reset state for it should happen when we do discover new messages for given topic partition.
Removes all the data from the buffer.
116 117 118 119 |
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 116 def clear @size = 0 @groups.each_value(&:clear) end |
#delete(topic, partition) ⇒ Object
Removes given topic and partition data out of the buffer This is used when there’s a partition revocation
78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 78 def delete(topic, partition) return unless @groups.key?(topic) return unless @groups.fetch(topic).key?(partition) topic_data = @groups.fetch(topic) topic_data.delete(partition) recount! # If there are no more partitions to handle in a given topic, remove it completely @groups.delete(topic) if topic_data.empty? end |
#each {|topic, partition, topic, has| ... } ⇒ Object
Allows to iterate over all the topics and partitions messages
66 67 68 69 70 71 72 |
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 66 def each @groups.each do |topic, partitions| partitions.each do |partition, details| yield(topic, partition, details[:messages], details[:eof]) end end end |
#eof(topic, partition) ⇒ Object
Marks given topic partition as one that reached eof
51 52 53 |
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 51 def eof(topic, partition) @groups[topic][partition][:eof] = true end |
#polled ⇒ Object
Marks the last polling time that can be accessed via #last_polled_at
56 57 58 |
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 56 def polled @last_polled_at = monotonic_now end |
#uniq! ⇒ Object
Removes duplicated messages from the same partitions This should be used only when rebalance occurs, as we may get data again we already have due to the processing from the last offset. In cases like this, we may get same data again and we do want to ensure as few duplications as possible
95 96 97 98 99 100 101 102 103 |
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 95 def uniq! @groups.each_value do |partitions| partitions.each_value do |details| details[:messages].uniq!(&:offset) end end recount! end |