Class: Karafka::Instrumentation::Vendors::Datadog::MetricsListener

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Configurable
Defined in:
lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb

Overview

Note:

You need to setup the dogstatsd-ruby client and assign it

Listener that can be used to subscribe to Karafka to receive stats via StatsD and/or Datadog

Defined Under Namespace

Classes: RdKafkaMetric

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ MetricsListener

Returns a new instance of MetricsListener.

Parameters:

  • block (Proc)

    configuration block



67
68
69
70
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 67

def initialize(&block)
  configure
  setup(&block) if block
end

Instance Method Details

#on_connection_listener_fetch_loop_received(event) ⇒ Object

Reports how many messages we’ve polled and how much time did we spend on it

Parameters:

  • event (Karafka::Core::Monitoring::Event)


108
109
110
111
112
113
114
115
116
117
118
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 108

def on_connection_listener_fetch_loop_received(event)
  time_taken = event[:time]
  messages_count = event[:messages_buffer].size

  consumer_group_id = event[:subscription_group].consumer_group.id

  extra_tags = ["consumer_group:#{consumer_group_id}"]

  histogram('listener.polling.time_taken', time_taken, tags: default_tags + extra_tags)
  histogram('listener.polling.messages', messages_count, tags: default_tags + extra_tags)
end

#on_consumer_consumed(event) ⇒ Object

Here we report majority of things related to processing as we have access to the consumer

Parameters:

  • event (Karafka::Core::Monitoring::Event)


123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 123

def on_consumer_consumed(event)
  consumer = event.payload[:caller]
  messages = consumer.messages
   = messages.

  tags = default_tags + consumer_tags(consumer)

  count('consumer.messages', messages.count, tags: tags)
  count('consumer.batches', 1, tags: tags)
  gauge('consumer.offset', .last_offset, tags: tags)
  histogram('consumer.consumed.time_taken', event[:time], tags: tags)
  histogram('consumer.batch_size', messages.count, tags: tags)
  histogram('consumer.processing_lag', .processing_lag, tags: tags)
  histogram('consumer.consumption_lag', .consumption_lag, tags: tags)
end

#on_error_occurred(event) ⇒ Object

Increases the errors count by 1

Parameters:

  • event (Karafka::Core::Monitoring::Event)


95
96
97
98
99
100
101
102
103
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 95

def on_error_occurred(event)
  extra_tags = ["type:#{event[:type]}"]

  if event.payload[:caller].respond_to?(:messages)
    extra_tags += consumer_tags(event.payload[:caller])
  end

  count('error_occurred', 1, tags: default_tags + extra_tags)
end

#on_statistics_emitted(event) ⇒ Object

Hooks up to Karafka instrumentation for emitted statistics

Parameters:

  • event (Karafka::Core::Monitoring::Event)


81
82
83
84
85
86
87
88
89
90
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 81

def on_statistics_emitted(event)
  statistics = event[:statistics]
  consumer_group_id = event[:consumer_group_id]

  base_tags = default_tags + ["consumer_group:#{consumer_group_id}"]

  rd_kafka_metrics.each do |metric|
    report_metric(metric, statistics, base_tags)
  end
end

#on_worker_process(event) ⇒ Object

Worker related metrics

Parameters:

  • event (Karafka::Core::Monitoring::Event)


158
159
160
161
162
163
164
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 158

def on_worker_process(event)
  jq_stats = event[:jobs_queue].statistics

  gauge('worker.total_threads', Karafka::App.config.concurrency, tags: default_tags)
  histogram('worker.processing', jq_stats[:busy], tags: default_tags)
  histogram('worker.enqueued_jobs', jq_stats[:enqueued], tags: default_tags)
end

#on_worker_processed(event) ⇒ Object

We report this metric before and after processing for higher accuracy Without this, the utilization would not be fully reflected

Parameters:

  • event (Karafka::Core::Monitoring::Event)


169
170
171
172
173
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 169

def on_worker_processed(event)
  jq_stats = event[:jobs_queue].statistics

  histogram('worker.processing', jq_stats[:busy], tags: default_tags)
end

#setup(&block) ⇒ Object

Note:

We define this alias to be consistent with WaterDrop#setup

Parameters:

  • block (Proc)

    configuration block



74
75
76
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 74

def setup(&block)
  configure(&block)
end