Class: Karafka::Pro::Instrumentation::PerformanceTracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/pro/instrumentation/performance_tracker.rb

Overview

Note:

Even if we have some race-conditions here it is relevant due to the quantity of data. This is why we do not mutex it.

Tracker used to keep track of performance metrics It provides insights that can be used to optimize processing flow

Instance Method Summary collapse

Constructor Details

#initializePerformanceTracker

Builds up nested concurrent hash for data tracking

[View source]

23
24
25
26
27
28
29
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 23

def initialize
  @processing_times = Hash.new do |topics_hash, topic|
    topics_hash[topic] = Hash.new do |partitions_hash, partition|
      partitions_hash[partition] = []
    end
  end
end

Instance Method Details

#on_consumer_consumed(event) ⇒ Object

Tracks time taken to process a single message of a given topic partition

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details

[View source]

46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 46

def on_consumer_consumed(event)
  consumer = event[:caller]
  messages = consumer.messages
  topic = messages..topic
  partition = messages..partition

  samples = @processing_times[topic][partition]
  samples << event[:time] / messages.count

  return unless samples.size > SAMPLES_COUNT

  samples.shift
end

#processing_time_p95(topic, partition) ⇒ Float

Returns p95 processing time of a single message from a single topic partition.

Parameters:

  • topic (String)
  • partition (Integer)

Returns:

  • (Float)

    p95 processing time of a single message from a single topic partition

[View source]

34
35
36
37
38
39
40
41
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 34

def processing_time_p95(topic, partition)
  values = @processing_times[topic][partition]

  return 0 if values.empty?
  return values.first if values.size == 1

  percentile(0.95, values)
end