Class: Karafka::Pro::Instrumentation::PerformanceTracker
- Inherits:
-
Object
- Object
- Karafka::Pro::Instrumentation::PerformanceTracker
- Includes:
- Singleton
- Defined in:
- lib/karafka/pro/instrumentation/performance_tracker.rb
Overview
Note:
Even if we have some race-conditions here it is relevant due to the quantity of data. This is why we do not mutex it.
Tracker used to keep track of performance metrics It provides insights that can be used to optimize processing flow
Instance Method Summary collapse
-
#initialize ⇒ PerformanceTracker
constructor
Builds up nested concurrent hash for data tracking.
-
#on_consumer_consumed(event) ⇒ Object
Tracks time taken to process a single message of a given topic partition.
-
#processing_time_p95(topic, partition) ⇒ Float
P95 processing time of a single message from a single topic partition.
Constructor Details
#initialize ⇒ PerformanceTracker
Builds up nested concurrent hash for data tracking
31 32 33 34 35 36 37 |
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 31 def initialize @processing_times = Hash.new do |topics_hash, topic| topics_hash[topic] = Hash.new do |partitions_hash, partition| partitions_hash[partition] = [] end end end |
Instance Method Details
#on_consumer_consumed(event) ⇒ Object
Tracks time taken to process a single message of a given topic partition
54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 54 def on_consumer_consumed(event) consumer = event[:caller] = consumer. topic = ..topic partition = ..partition samples = @processing_times[topic][partition] samples << event[:time] / .count return unless samples.size > SAMPLES_COUNT samples.shift end |
#processing_time_p95(topic, partition) ⇒ Float
Returns p95 processing time of a single message from a single topic partition.
42 43 44 45 46 47 48 49 |
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 42 def processing_time_p95(topic, partition) values = @processing_times[topic][partition] return 0 if values.empty? return values.first if values.size == 1 percentile(0.95, values) end |