Class: Karafka::Web::Tracking::Consumers::Listeners::Statistics

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/web/tracking/consumers/listeners/statistics.rb

Overview

Listener used to collect metrics published by librdkafka

Instance Method Summary collapse

Instance Method Details

#on_statistics_emitted(event) ⇒ Object

Collect Kafka metrics

Parameters:

  • event (Karafka::Core::Monitoring::Event)


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/karafka/web/tracking/consumers/listeners/statistics.rb', line 13

def on_statistics_emitted(event)
  statistics = event[:statistics]
  topics = statistics.fetch('topics')
  cgrp = statistics.fetch('cgrp')
  cg_id = event[:consumer_group_id]
  sg_id = event[:subscription_group_id]
  sg_details = extract_sg_details(sg_id, cgrp)

  track_transfers(statistics)

  # More than one subscription group from the same consumer group may be reporting
  # almost the same time. To prevent corruption of partial data, we put everything here
  # in track as we merge data from multiple subscription groups
  track do |sampler|
    topics.each do |topic_name, topic_values|
      partitions = topic_values.fetch('partitions')

      partitions.each do |pt_name, pt_stats|
        pt_id = pt_name.to_i

        next unless partition_reportable?(pt_id, pt_stats)

        metrics = extract_partition_metrics(pt_stats)

        next if metrics.empty?

        topics_details = sg_details[:topics]

        topic_details = topics_details[topic_name] ||= {
          name: topic_name,
          partitions: {}
        }

        topic_details[:partitions][pt_id] = metrics.merge(
          id: pt_id
        ).merge(
          # Pauses are stored on a consumer group since we do not process same topic
          # twice in the multiple subscription groups
          poll_details(sg_id, topic_name, pt_id)
        )
      end
    end

    sampler.consumer_groups[cg_id][:subscription_groups][sg_id] = sg_details
  end
end