Class: Karafka::Pro::Instrumentation::PerformanceTracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/pro/instrumentation/performance_tracker.rb

Overview

Note:

Even if we have some race-conditions here it is relevant due to the quantity of data. This is why we do not mutex it.

Tracker used to keep track of performance metrics It provides insights that can be used to optimize processing flow

Instance Method Summary collapse

Constructor Details

#initializePerformanceTracker

Builds up nested concurrent hash for data tracking



23
24
25
26
27
28
29
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 23

def initialize
  @processing_times = Hash.new do |topics_hash, topic|
    topics_hash[topic] = Hash.new do |partitions_hash, partition|
      partitions_hash[partition] = []
    end
  end
end

Instance Method Details

#on_consumer_consumed(event) ⇒ Object

Tracks time taken to process a single message of a given topic partition

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details



46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 46

def on_consumer_consumed(event)
  consumer = event[:caller]
  messages = consumer.messages
  topic = messages..topic
  partition = messages..partition

  samples = @processing_times[topic][partition]
  samples << event[:time] / messages.count

  return unless samples.size > SAMPLES_COUNT

  samples.shift
end

#processing_time_p95(topic, partition) ⇒ Float

Returns p95 processing time of a single message from a single topic partition.

Parameters:

  • topic (String)
  • partition (Integer)

Returns:

  • (Float)

    p95 processing time of a single message from a single topic partition



34
35
36
37
38
39
40
41
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 34

def processing_time_p95(topic, partition)
  values = @processing_times[topic][partition]

  return 0 if values.empty?
  return values.first if values.size == 1

  percentile(0.95, values)
end