Class: Karafka::Web::Tracking::Consumers::Listeners::Statistics
- Defined in:
- lib/karafka/web/tracking/consumers/listeners/statistics.rb
Overview
Listener used to collect metrics published by librdkafka
Instance Method Summary collapse
-
#on_statistics_emitted(event) ⇒ Object
Collect Kafka metrics.
Instance Method Details
#on_statistics_emitted(event) ⇒ Object
Collect Kafka metrics
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/karafka/web/tracking/consumers/listeners/statistics.rb', line 13 def on_statistics_emitted(event) statistics = event[:statistics] topics = statistics.fetch('topics') cgrp = statistics.fetch('cgrp') cg_id = event[:consumer_group_id] sg_id = event[:subscription_group_id] sg_details = extract_sg_details(sg_id, cgrp) track_transfers(statistics) # More than one subscription group from the same consumer group may be reporting # almost the same time. To prevent corruption of partial data, we put everything here # in track as we merge data from multiple subscription groups track do |sampler| topics.each do |topic_name, topic_values| partitions = topic_values.fetch('partitions') partitions.each do |pt_name, pt_stats| pt_id = pt_name.to_i next unless partition_reportable?(pt_id, pt_stats) metrics = extract_partition_metrics(pt_stats) next if metrics.empty? topics_details = sg_details[:topics] topic_details = topics_details[topic_name] ||= { name: topic_name, partitions: {} } topic_details[:partitions][pt_id] = metrics.merge( id: pt_id ).merge( # Pauses are stored on a consumer group since we do not process same topic # twice in the multiple subscription groups poll_details(sg_id, topic_name, pt_id) ) end end sampler.consumer_groups[cg_id][:subscription_groups][sg_id] = sg_details end end |