Class: Karafka::Pro::Instrumentation::PerformanceTracker
- Inherits:
-
Object
- Object
- Karafka::Pro::Instrumentation::PerformanceTracker
- Includes:
- Singleton
- Defined in:
- lib/karafka/pro/instrumentation/performance_tracker.rb
Overview
Note:
Even if we have some race-conditions here it is relevant due to the quantity of data. This is why we do not mutex it.
Tracker used to keep track of performance metrics It provides insights that can be used to optimize processing flow
Instance Method Summary collapse
-
#initialize ⇒ PerformanceTracker
constructor
Builds up nested concurrent hash for data tracking.
-
#on_consumer_consumed(event) ⇒ Object
Tracks time taken to process a single message of a given topic partition.
-
#processing_time_p95(topic, partition) ⇒ Float
P95 processing time of a single message from a single topic partition.
Constructor Details
#initialize ⇒ PerformanceTracker
Builds up nested concurrent hash for data tracking
23 24 25 26 27 28 29 |
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 23 def initialize @processing_times = Hash.new do |topics_hash, topic| topics_hash[topic] = Hash.new do |partitions_hash, partition| partitions_hash[partition] = [] end end end |
Instance Method Details
#on_consumer_consumed(event) ⇒ Object
Tracks time taken to process a single message of a given topic partition
46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 46 def on_consumer_consumed(event) consumer = event[:caller] = consumer. topic = ..topic partition = ..partition samples = @processing_times[topic][partition] samples << event[:time] / .count return unless samples.size > SAMPLES_COUNT samples.shift end |
#processing_time_p95(topic, partition) ⇒ Float
Returns p95 processing time of a single message from a single topic partition.
34 35 36 37 38 39 40 41 |
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 34 def processing_time_p95(topic, partition) values = @processing_times[topic][partition] return 0 if values.empty? return values.first if values.size == 1 percentile(0.95, values) end |