Class: Karafka::Pro::Processing::Partitioner
- Inherits:
-
Karafka::Processing::Partitioner
- Object
- Karafka::Processing::Partitioner
- Karafka::Pro::Processing::Partitioner
- Defined in:
- lib/karafka/pro/processing/partitioner.rb
Overview
Pro partitioner that can distribute work based on the virtual partitioner settings
Instance Method Summary collapse
Methods inherited from Karafka::Processing::Partitioner
Constructor Details
This class inherits a constructor from Karafka::Processing::Partitioner
Instance Method Details
#call(topic, messages, coordinator) {|group, karafka| ... } ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/karafka/pro/processing/partitioner.rb', line 25 def call(topic, , coordinator) ktopic = @subscription_group.topics.find(topic) vps = ktopic.virtual_partitions # We only partition work if we have: # - a virtual partitioner # - more than one thread to process the data # - collective is not collapsed via coordinator # - none of the partitioner executions raised an error # # With one thread it is not worth partitioning the work as the work itself will be # assigned to one thread (pointless work) # # We collapse the partitioning on errors because we "regain" full ordering on a batch # that potentially contains the data that caused the error. # # This is great because it allows us to run things without the parallelization that adds # a bit of uncertainty and allows us to use DLQ and safely skip messages if needed. if vps.active? && vps.max_partitions > 1 && !coordinator.collapsed? # If we cannot virtualize even one message from a given batch due to user errors, we # reduce the whole set into one partition and emit error. This should still allow for # user flow but should mitigate damages by not virtualizing begin groupings = .group_by do |msg| # We need to reduce it to the max concurrency, so the group_id is not a direct # effect of the end user action. Otherwise the persistence layer for consumers # would cache it forever and it would cause memory leaks # # This also needs to be consistent because the aggregation here needs to warrant, # that the same partitioned message will always be assigned to the same virtual # partition. Otherwise in case of a window aggregation with VP spanning across # several polls, the data could not be complete. vps.reducer.call( vps.partitioner.call(msg) ) end rescue StandardError => e # This should not happen. If you are seeing this it means your partitioner code # failed and raised an error. We highly recommend mitigating partitioner level errors # on the user side because this type of collapse should be considered a last resort Karafka.monitor.instrument( 'error.occurred', caller: self, error: e, messages: , type: 'virtual_partitions.partitioner.error' ) groupings = { 0 => } end groupings.each do |key, | yield(key, ) end else # When no virtual partitioner, works as regular one yield(0, ) end end |