Class: Karafka::Pro::Processing::Coordinators::VirtualOffsetManager
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::Coordinators::VirtualOffsetManager
- Defined in:
- lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb
Overview
We still use the regular coordinator “real” offset management as we want to have them as separated as possible because the real seek offset management is also used for pausing, filtering and others and should not be impacted by the virtual one
This manager is not thread-safe by itself. It should operate from coordinator locked locations.
Manager that keeps track of our offsets with the virtualization layer that are local to given partition assignment. It allows for easier offset management for virtual virtual partition cases as it provides us ability to mark as consumed and move the real offset behind as expected.
Instance Attribute Summary collapse
-
#groups ⇒ Object
readonly
Returns the value of attribute groups.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears the manager for a next collective operation.
-
#initialize(topic, partition, offset_metadata_strategy) ⇒ VirtualOffsetManager
constructor
A new instance of VirtualOffsetManager.
-
#mark(message, offset_metadata) ⇒ Object
Marks given message as marked (virtually consumed).
-
#mark_until(message, offset_metadata) ⇒ Object
Mark all from all groups including the
message
. -
#markable ⇒ Array<Messages::Seek, String>
Markable message for real offset marking and its associated metadata.
-
#markable? ⇒ Boolean
Is there a real offset we can mark as consumed.
-
#marked ⇒ Array<Integer>
Offsets of messages already marked as consumed virtually.
-
#register(offsets_group) ⇒ Object
Registers an offset group coming from one virtual consumer.
Constructor Details
#initialize(topic, partition, offset_metadata_strategy) ⇒ VirtualOffsetManager
We need topic and partition because we use a seek message (virtual) for real offset management. We could keep real message reference but this can be memory consuming and not worth it.
Returns a new instance of VirtualOffsetManager.
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 33 def initialize(topic, partition, ) @topic = topic @partition = partition @groups = [] @marked = {} @offsets_metadata = {} @real_offset = -1 @offset_metadata_strategy = @current_offset_metadata = nil end |
Instance Attribute Details
#groups ⇒ Object (readonly)
Returns the value of attribute groups.
22 23 24 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 22 def groups @groups end |
Instance Method Details
#clear ⇒ Object
Clears the manager for a next collective operation
45 46 47 48 49 50 51 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 45 def clear @groups.clear @offsets_metadata.clear @current_offset_metadata = nil @marked.clear @real_offset = -1 end |
#mark(message, offset_metadata) ⇒ Object
Marks given message as marked (virtually consumed). We mark given message offset and other earlier offsets from the same group as done and we can refresh our real offset representation based on that as it might have changed to a newer real offset.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 70 def mark(, ) offset = .offset # Store metadata when we materialize the most stable offset @offsets_metadata[offset] = @current_offset_metadata = group = @groups.find { |reg_group| reg_group.include?(offset) } # This case can happen when someone uses MoM and wants to mark message from a previous # batch as consumed. We can add it, since the real offset refresh will point to it unless group group = [offset] @groups << group end position = group.index(offset) # Mark all previous messages from the same group also as virtually consumed group[0..position].each do |markable_offset| # Set previous messages metadata offset as the offset of higher one for overwrites # unless a different metadata were set explicitely @offsets_metadata[markable_offset] ||= @marked[markable_offset] = true end # Recompute the real offset representation materialize_real_offset end |
#mark_until(message, offset_metadata) ⇒ Object
Mark all from all groups including the message
. Useful when operating in a collapsed state for marking
104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 104 def mark_until(, ) mark(, ) @groups.each do |group| group.each do |offset| next if offset > .offset @offsets_metadata[offset] = @marked[offset] = true end end materialize_real_offset end |
#markable ⇒ Array<Messages::Seek, String>
Returns markable message for real offset marking and its associated metadata.
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 132 def markable raise Errors::InvalidRealOffsetUsageError unless markable? = case @offset_metadata_strategy when :exact @offsets_metadata.fetch(@real_offset) when :current @current_offset_metadata else raise Errors::UnsupportedCaseError, @offset_metadata_strategy end [ Messages::Seek.new( @topic, @partition, @real_offset ), ] end |
#markable? ⇒ Boolean
Is there a real offset we can mark as consumed
126 127 128 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 126 def markable? !@real_offset.negative? end |
#marked ⇒ Array<Integer>
Returns Offsets of messages already marked as consumed virtually.
120 121 122 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 120 def marked @marked.select { |_, status| status }.map(&:first).sort end |
#register(offsets_group) ⇒ Object
Registers an offset group coming from one virtual consumer. In order to move the real underlying offset accordingly, we need to make sure to track the virtual consumers offsets groups independently and only materialize the end result.
58 59 60 61 62 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 58 def register(offsets_group) @groups << offsets_group offsets_group.each { |offset| @marked[offset] = false } end |