Class: Karafka::Pro::Processing::Collapser

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/processing/collapser.rb

Overview

Manages the collapse of virtual partitions Since any non-virtual partition is actually a virtual partition of size one, we can use it in a generic manner without having to distinguish between those cases.

We need to have notion of the offset until we want to collapse because upon pause and retry rdkafka may purge the buffer. This means, that we may end up with smaller or bigger (different) dataset and without tracking the end of collapse, there would be a chance for things to flicker. Tracking allows us to ensure, that collapse is happening until all the messages from the corrupted batch are processed.

Instance Method Summary collapse

Constructor Details

#initializeCollapser

When initialized, nothing is collapsed



28
29
30
31
32
# File 'lib/karafka/pro/processing/collapser.rb', line 28

def initialize
  @collapsed = false
  @until_offset = -1
  @mutex = Mutex.new
end

Instance Method Details

#collapse_until!(offset) ⇒ Object

Collapse until given offset. Until given offset is encountered or offset bigger than that we keep collapsing.

Parameters:

  • offset (Integer)

    offset until which we keep the collapse



42
43
44
45
46
47
48
49
# File 'lib/karafka/pro/processing/collapser.rb', line 42

def collapse_until!(offset)
  @mutex.synchronize do
    # We check it here in case after a pause and re-fetch we would get less messages and
    # one of them would cause an error. We do not want to overwrite the offset here unless
    # it is bigger.
    @until_offset = offset if offset > @until_offset
  end
end

#collapsed?Boolean

Returns Should we collapse into a single consumer.

Returns:

  • (Boolean)

    Should we collapse into a single consumer



35
36
37
# File 'lib/karafka/pro/processing/collapser.rb', line 35

def collapsed?
  @collapsed
end

#refresh!(first_offset) ⇒ Object

Sets the collapse state based on the first collective offset that we are going to process and makes the decision whether or not we need to still keep the collapse.

Parameters:

  • first_offset (Integer)

    first offset from a collective batch



54
55
56
57
58
# File 'lib/karafka/pro/processing/collapser.rb', line 54

def refresh!(first_offset)
  @mutex.synchronize do
    @collapsed = first_offset < @until_offset
  end
end