Class: Karafka::Pro::Processing::Collapser
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::Collapser
- Defined in:
- lib/karafka/pro/processing/collapser.rb
Overview
Manages the collapse of virtual partitions Since any non-virtual partition is actually a virtual partition of size one, we can use it in a generic manner without having to distinguish between those cases.
We need to have notion of the offset until we want to collapse because upon pause and retry rdkafka may purge the buffer. This means, that we may end up with smaller or bigger (different) dataset and without tracking the end of collapse, there would be a chance for things to flicker. Tracking allows us to ensure, that collapse is happening until all the messages from the corrupted batch are processed.
Instance Method Summary collapse
-
#collapse_until!(offset) ⇒ Object
Collapse until given offset.
-
#collapsed? ⇒ Boolean
Should we collapse into a single consumer.
-
#initialize ⇒ Collapser
constructor
When initialized, nothing is collapsed.
-
#refresh!(first_offset) ⇒ Object
Sets the collapse state based on the first collective offset that we are going to process and makes the decision whether or not we need to still keep the collapse.
Constructor Details
#initialize ⇒ Collapser
When initialized, nothing is collapsed
28 29 30 31 32 |
# File 'lib/karafka/pro/processing/collapser.rb', line 28 def initialize @collapsed = false @until_offset = -1 @mutex = Mutex.new end |
Instance Method Details
#collapse_until!(offset) ⇒ Object
Collapse until given offset. Until given offset is encountered or offset bigger than that we keep collapsing.
42 43 44 45 46 47 48 49 |
# File 'lib/karafka/pro/processing/collapser.rb', line 42 def collapse_until!(offset) @mutex.synchronize do # We check it here in case after a pause and re-fetch we would get less messages and # one of them would cause an error. We do not want to overwrite the offset here unless # it is bigger. @until_offset = offset if offset > @until_offset end end |
#collapsed? ⇒ Boolean
Returns Should we collapse into a single consumer.
35 36 37 |
# File 'lib/karafka/pro/processing/collapser.rb', line 35 def collapsed? @collapsed end |
#refresh!(first_offset) ⇒ Object
Sets the collapse state based on the first collective offset that we are going to process and makes the decision whether or not we need to still keep the collapse.
54 55 56 57 58 |
# File 'lib/karafka/pro/processing/collapser.rb', line 54 def refresh!(first_offset) @mutex.synchronize do @collapsed = first_offset < @until_offset end end |