Class: Karafka::Pro::Processing::Partitioner

Inherits:
Karafka::Processing::Partitioner show all
Defined in:
lib/karafka/pro/processing/partitioner.rb

Overview

Pro partitioner that can distribute work based on the virtual partitioner settings

Instance Method Summary collapse

Methods inherited from Karafka::Processing::Partitioner

#initialize

Constructor Details

This class inherits a constructor from Karafka::Processing::Partitioner

Instance Method Details

#call(topic, messages, coordinator) {|group, karafka| ... } ⇒ Object

Parameters:

Yield Parameters:



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/karafka/pro/processing/partitioner.rb', line 25

def call(topic, messages, coordinator)
  ktopic = @subscription_group.topics.find(topic)

  vps = ktopic.virtual_partitions

  # We only partition work if we have:
  # - a virtual partitioner
  # - more than one thread to process the data
  # - collective is not collapsed via coordinator
  # - none of the partitioner executions raised an error
  #
  # With one thread it is not worth partitioning the work as the work itself will be
  # assigned to one thread (pointless work)
  #
  # We collapse the partitioning on errors because we "regain" full ordering on a batch
  # that potentially contains the data that caused the error.
  #
  # This is great because it allows us to run things without the parallelization that adds
  # a bit of uncertainty and allows us to use DLQ and safely skip messages if needed.
  if vps.active? && vps.max_partitions > 1 && !coordinator.collapsed?
    # If we cannot virtualize even one message from a given batch due to user errors, we
    # reduce the whole set into one partition and emit error. This should still allow for
    # user flow but should mitigate damages by not virtualizing
    begin
      groupings = messages.group_by do |msg|
        # We need to reduce it to the max concurrency, so the group_id is not a direct
        # effect of the end user action. Otherwise the persistence layer for consumers
        # would cache it forever and it would cause memory leaks
        #
        # This also needs to be consistent because the aggregation here needs to warrant,
        # that the same partitioned message will always be assigned to the same virtual
        # partition. Otherwise in case of a window aggregation with VP spanning across
        # several polls, the data could not be complete.
        vps.reducer.call(
          vps.partitioner.call(msg)
        )
      end
    rescue StandardError => e
      # This should not happen. If you are seeing this it means your partitioner code
      # failed and raised an error. We highly recommend mitigating partitioner level errors
      # on the user side because this type of collapse should be considered a last resort
      Karafka.monitor.instrument(
        'error.occurred',
        caller: self,
        error: e,
        messages: messages,
        type: 'virtual_partitions.partitioner.error'
      )

      groupings = { 0 => messages }
    end

    groupings.each do |key, messages_group|
      yield(key, messages_group)
    end
  else
    # When no virtual partitioner, works as regular one
    yield(0, messages)
  end
end