Class: Karafka::Instrumentation::Vendors::Appsignal::MetricsListener

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb

Overview

Listener that ships metrics to Appsignal

Defined Under Namespace

Classes: RdKafkaMetric

Instance Method Summary collapse

Methods inherited from Base

#initialize, #setup

Constructor Details

This class inherits a constructor from Karafka::Instrumentation::Vendors::Appsignal::Base

Instance Method Details

#count(key, value, tags) ⇒ Object

Increments a counter with a namespace key, value and tags

Parameters:

  • key (String)

    key we want to use (without the namespace)

  • value (Integer)

    count value

  • tags (Hash)

    additional extra tags



263
264
265
266
267
268
269
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 263

def count(key, value, tags)
  client.count(
    namespaced_metric(key),
    value,
    tags
  )
end

#gauge(key, value, tags) ⇒ Object

Sets the gauge value

Parameters:

  • key (String)

    key we want to use (without the namespace)

  • value (Integer)

    gauge value

  • tags (Hash)

    additional extra tags



276
277
278
279
280
281
282
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 276

def gauge(key, value, tags)
  client.gauge(
    namespaced_metric(key),
    value,
    tags
  )
end

#on_app_running(_event) ⇒ Object

Register minute based probe only on app running. Otherwise if we would always register minute probe, it would report on processes using Karafka but not running the consumption process

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


98
99
100
101
102
103
104
105
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 98

def on_app_running(_event)
  return if @probe_registered

  @probe_registered = true

  # Registers the minutely probe for one-every-minute metrics
  client.register_probe(:karafka, -> { minute_probe })
end

#on_consumer_consume(event) ⇒ Object

Before each consumption process, lets start a transaction associated with it We also set some basic metadata about the given consumption that can be useful for debugging

Parameters:

  • event (Karafka::Core::Monitoring::Event)


60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 60

def on_consumer_consume(event)
  consumer = event.payload[:caller]

  start_transaction(consumer, 'consume')

  client. = {
    batch_size: consumer.messages.size,
    first_offset: consumer.messages..first_offset,
    last_offset: consumer.messages..last_offset,
    consumer_group: consumer.topic.consumer_group.id,
    topic: consumer.topic.name,
    partition: consumer.partition,
    attempt: consumer.coordinator.pause_tracker.attempt
  }
end

#on_consumer_consumed(event) ⇒ Object

Once we’re done with consumption, we bump counters about that

Parameters:

  • event (Karafka::Core::Monitoring::Event)


79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 79

def on_consumer_consumed(event)
  consumer = event.payload[:caller]
  messages = consumer.messages
   = messages.

  with_multiple_resolutions(consumer) do |tags|
    count('consumer_messages', messages.size, tags)
    count('consumer_batches', 1, tags)
    gauge('consumer_offsets', .last_offset, tags)
  end

  stop_transaction
end

#on_dead_letter_queue_dispatched(event) ⇒ Object

Counts DLQ dispatches

Parameters:

  • event (Karafka::Core::Monitoring::Event)


133
134
135
136
137
138
139
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 133

def on_dead_letter_queue_dispatched(event)
  consumer = event.payload[:caller]

  with_multiple_resolutions(consumer) do |tags|
    count('consumer_dead', 1, tags)
  end
end

#on_error_occurred(event) ⇒ Object

Reports on any error that occurs. This also includes non-user related errors originating from the framework.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    error event details



145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 145

def on_error_occurred(event)
  # If this is a user consumption related error, we bump the counters for metrics
  if USER_CONSUMER_ERROR_TYPES.include?(event[:type])
    consumer = event.payload[:caller]

    with_multiple_resolutions(consumer) do |tags|
      count('consumer_errors', 1, tags)
    end
  end

  stop_transaction
end

#on_statistics_emitted(event) ⇒ Object

Hooks up to Karafka instrumentation for emitted statistics

Parameters:

  • event (Karafka::Core::Monitoring::Event)


161
162
163
164
165
166
167
168
169
170
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 161

def on_statistics_emitted(event)
  statistics = event[:statistics]
  consumer_group_id = event[:consumer_group_id]

  rd_kafka_metrics.each do |metric|
    report_metric(metric, statistics, consumer_group_id)
  end

  report_aggregated_topics_metrics(statistics, consumer_group_id)
end

#report_aggregated_topics_metrics(statistics, consumer_group_id) ⇒ Object

Publishes aggregated topic-level metrics that are sum of per partition metrics

Parameters:

  • statistics (Hash)

    hash with all the statistics emitted

  • consumer_group_id (String)

    cg in context which we operate



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 231

def report_aggregated_topics_metrics(statistics, consumer_group_id)
  config.aggregated_rd_kafka_metrics.each do |metric|
    statistics.fetch('topics').each do |topic_name, topic_values|
      sum = 0

      topic_values['partitions'].each do |partition_name, partition_statistics|
        next if partition_name == '-1'
        # Skip until lag info is available
        next if partition_statistics['consumer_lag'] == -1
        next if partition_statistics['consumer_lag_stored'] == -1

        sum += partition_statistics.dig(*metric.key_location)
      end

      public_send(
        metric.type,
        metric.name,
        sum,
        {
          consumer_group: consumer_group_id,
          topic: topic_name
        }
      )
    end
  end
end

#report_metric(metric, statistics, consumer_group_id) ⇒ Object

Reports a given metric statistics to Appsignal

Parameters:

  • metric (RdKafkaMetric)

    metric value object

  • statistics (Hash)

    hash with all the statistics emitted

  • consumer_group_id (String)

    cg in context which we operate



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 176

def report_metric(metric, statistics, consumer_group_id)
  case metric.scope
  when :root
    # Do nothing on the root metrics as the same metrics are reported in a granular
    # way from other places
    nil
  when :brokers
    statistics.fetch('brokers').each_value do |broker_statistics|
      # Skip bootstrap nodes
      # Bootstrap nodes have nodeid -1, other nodes have positive
      # node ids
      next if broker_statistics['nodeid'] == -1

      public_send(
        metric.type,
        metric.name,
        broker_statistics.dig(*metric.key_location),
        {
          broker: broker_statistics['nodename']
        }
      )
    end
  when :topics
    statistics.fetch('topics').each do |topic_name, topic_values|
      topic_values['partitions'].each do |partition_name, partition_statistics|
        next if partition_name == '-1'
        # Skip until lag info is available
        next if partition_statistics['consumer_lag'] == -1
        next if partition_statistics['consumer_lag_stored'] == -1

        # Skip if we do not own the fetch assignment
        next if partition_statistics['fetch_state'] == 'stopped'
        next if partition_statistics['fetch_state'] == 'none'

        public_send(
          metric.type,
          metric.name,
          partition_statistics.dig(*metric.key_location),
          {
            consumer_group: consumer_group_id,
            topic: topic_name,
            partition: partition_name
          }
        )
      end
    end
  else
    raise ArgumentError, metric.scope
  end
end