Class: Karafka::Pro::Processing::Collapser

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/processing/collapser.rb

Overview

Manages the collapse of virtual partitions Since any non-virtual partition is actually a virtual partition of size one, we can use it in a generic manner without having to distinguish between those cases.

We need to have notion of the offset until we want to collapse because upon pause and retry rdkafka may purge the buffer. This means, that we may end up with smaller or bigger (different) dataset and without tracking the end of collapse, there would be a chance for things to flicker. Tracking allows us to ensure, that collapse is happening until all the messages from the corrupted batch are processed.

Instance Method Summary collapse

Constructor Details

#initializeCollapser

When initialized, nothing is collapsed



20
21
22
23
24
# File 'lib/karafka/pro/processing/collapser.rb', line 20

def initialize
  @collapsed = false
  @until_offset = -1
  @mutex = Mutex.new
end

Instance Method Details

#collapse_until!(offset) ⇒ Object

Collapse until given offset. Until given offset is encountered or offset bigger than that we keep collapsing.

Parameters:

  • offset (Integer)

    offset until which we keep the collapse



34
35
36
37
38
39
40
41
# File 'lib/karafka/pro/processing/collapser.rb', line 34

def collapse_until!(offset)
  @mutex.synchronize do
    # We check it here in case after a pause and re-fetch we would get less messages and
    # one of them would cause an error. We do not want to overwrite the offset here unless
    # it is bigger.
    @until_offset = offset if offset > @until_offset
  end
end

#collapsed?Boolean

Returns Should we collapse into a single consumer.

Returns:

  • (Boolean)

    Should we collapse into a single consumer



27
28
29
# File 'lib/karafka/pro/processing/collapser.rb', line 27

def collapsed?
  @collapsed
end

#refresh!(first_offset) ⇒ Object

Sets the collapse state based on the first collective offset that we are going to process and makes the decision whether or not we need to still keep the collapse.

Parameters:

  • first_offset (Integer)

    first offset from a collective batch



46
47
48
49
50
# File 'lib/karafka/pro/processing/collapser.rb', line 46

def refresh!(first_offset)
  @mutex.synchronize do
    @collapsed = first_offset < @until_offset
  end
end