Class: Karafka::Instrumentation::LoggerListener

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/instrumentation/logger_listener.rb

Overview

Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Karafka app flow.

Instance Method Summary collapse

Constructor Details

#initialize(log_polling: true) ⇒ LoggerListener

Returns a new instance of LoggerListener.

Parameters:

  • log_polling (Boolean) (defaults to: true)

    should we log the fact that messages are being polled. This is usually noisy and not useful in production but can be useful in dev. While users can do this themselves this has been requested and asked for often, thus similar to how extensive logging can be disabled in WaterDrop, we do it here as well.



23
24
25
# File 'lib/karafka/instrumentation/logger_listener.rb', line 23

def initialize(log_polling: true)
  @log_polling = log_polling
end

Instance Method Details

#on_app_quiet(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



205
206
207
# File 'lib/karafka/instrumentation/logger_listener.rb', line 205

def on_app_quiet(event)
  info "[#{event[:server_id]}] Reached quiet mode. No messages will be processed anymore"
end

#on_app_quieting(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



200
201
202
# File 'lib/karafka/instrumentation/logger_listener.rb', line 200

def on_app_quieting(event)
  info "[#{event[:server_id]}] Switching to quiet mode. New messages will not be processed"
end

#on_app_running(event) ⇒ Object

Logs info that we’re running Karafka app.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



188
189
190
191
192
193
194
195
196
197
# File 'lib/karafka/instrumentation/logger_listener.rb', line 188

def on_app_running(event)
  server_id = event[:server_id]

  info "[#{server_id}] Running in #{RUBY_DESCRIPTION}"
  info "[#{server_id}] Running Karafka #{Karafka::VERSION} server"

  return if Karafka.pro?

  info "[#{server_id}] See LICENSE and the LGPL-3.0 for licensing details"
end

#on_app_stopped(event) ⇒ Object

Logs info that we stopped the Karafka server.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



219
220
221
# File 'lib/karafka/instrumentation/logger_listener.rb', line 219

def on_app_stopped(event)
  info "[#{event[:server_id]}] Stopped Karafka server"
end

#on_app_stopping(event) ⇒ Object

Logs info that we’re going to stop the Karafka server.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



212
213
214
# File 'lib/karafka/instrumentation/logger_listener.rb', line 212

def on_app_stopping(event)
  info "[#{event[:server_id]}] Stopping Karafka server"
end

#on_client_pause(event) ⇒ Object

Note:

There may be no offset provided in case user wants to pause on the consecutive offset position. This can be beneficial when not wanting to purge the buffers.

Prints info about a consumer pause occurrence. Irrelevant if user or system initiated.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/karafka/instrumentation/logger_listener.rb', line 103

def on_client_pause(event)
  topic = event[:topic]
  partition = event[:partition]
  offset = event[:offset]
  client = event[:caller]

  info <<~MSG.tr("\n", ' ').strip!
    [#{client.id}]
    Pausing on topic #{topic}-#{partition}
    on #{offset ? "offset #{offset}" : 'the consecutive offset'}
  MSG
end

#on_client_resume(event) ⇒ Object

Prints information about resuming of processing of a given topic partition

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



119
120
121
122
123
124
125
126
127
# File 'lib/karafka/instrumentation/logger_listener.rb', line 119

def on_client_resume(event)
  topic = event[:topic]
  partition = event[:partition]
  client = event[:caller]

  info <<~MSG.tr("\n", ' ').strip!
    [#{client.id}] Resuming on topic #{topic}-#{partition}
  MSG
end

#on_connection_listener_before_fetch_loop(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/karafka/instrumentation/logger_listener.rb', line 30

def on_connection_listener_before_fetch_loop(event)
  listener_id = event[:caller].id
  subscription_group = event[:subscription_group]
  consumer_group_id = subscription_group.consumer_group.id
  topics = subscription_group.topics.select(&:active?).map(&:name).join(', ')
  group_details = "#{consumer_group_id}/#{subscription_group.id}"

  info(
    "[#{listener_id}] Group #{group_details} subscribing to topics: #{topics}"
  )
end

#on_connection_listener_fetch_loop(event) ⇒ Object

Logs each messages fetching attempt

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



45
46
47
48
49
50
# File 'lib/karafka/instrumentation/logger_listener.rb', line 45

def on_connection_listener_fetch_loop(event)
  return unless log_polling?

  listener_id = event[:caller].id
  debug "[#{listener_id}] Polling messages..."
end

#on_connection_listener_fetch_loop_received(event) ⇒ Object

Logs about messages that we’ve received from Kafka

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/karafka/instrumentation/logger_listener.rb', line 55

def on_connection_listener_fetch_loop_received(event)
  return unless log_polling?

  listener_id = event[:caller].id
  time = event[:time].round(2)
  messages_count = event[:messages_buffer].size

  message = "[#{listener_id}] Polled #{messages_count} messages in #{time}ms"

  # We don't want the "polled 0" in dev as it would spam the log
  # Instead we publish only info when there was anything we could poll and fail over to the
  # zero notifications when in debug mode
  messages_count.zero? ? debug(message) : info(message)
end

#on_consumer_consuming_retry(event) ⇒ Object

Prints info about retry of processing after an error

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/karafka/instrumentation/logger_listener.rb', line 132

def on_consumer_consuming_retry(event)
  topic = event[:topic]
  partition = event[:partition]
  offset = event[:offset]
  consumer = event[:caller]
  timeout = event[:timeout]

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Retrying of #{consumer.class} after #{timeout} ms
    on topic #{topic}-#{partition} from offset #{offset}
  MSG
end

#on_consumer_consuming_seek(event) ⇒ Object

Prints info about seeking to a particular location

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



148
149
150
151
152
153
154
155
156
157
158
# File 'lib/karafka/instrumentation/logger_listener.rb', line 148

def on_consumer_consuming_seek(event)
  topic = event[:topic]
  partition = event[:partition]
  seek_offset = event[:message].offset
  consumer = event[:caller]

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Seeking from #{consumer.class}
    on topic #{topic}-#{partition} to offset #{seek_offset}
  MSG
end

#on_dead_letter_queue_dispatched(event) ⇒ Object

Logs info when we have dispatched a message the the DLQ

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/karafka/instrumentation/logger_listener.rb', line 262

def on_dead_letter_queue_dispatched(event)
  consumer = event[:caller]
  topic = consumer.topic.name
  message = event[:message]
  offset = message.offset
  dlq_topic = consumer.topic.dead_letter_queue.topic
  partition = message.partition

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Dispatched message #{offset}
    from #{topic}-#{partition}
    to DLQ topic: #{dlq_topic}
  MSG
end

#on_error_occurred(event) ⇒ Object

There are many types of errors that can occur in many places, but we provide a single handler for all of them to simplify error instrumentation.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
# File 'lib/karafka/instrumentation/logger_listener.rb', line 341

def on_error_occurred(event)
  type = event[:type]
  error = event[:error]
  details = (error.backtrace || []).join("\n")

  case type
  when 'consumer.initialized.error'
    error "Consumer initialized error: #{error}"
    error details
  when 'consumer.wrap.error'
    error "Consumer wrap failed due to an error: #{error}"
    error details
  when 'consumer.consume.error'
    error "Consumer consuming error: #{error}"
    error details
  when 'consumer.revoked.error'
    error "Consumer on revoked failed due to an error: #{error}"
    error details
  when 'consumer.idle.error'
    error "Consumer idle failed due to an error: #{error}"
    error details
  when 'consumer.shutdown.error'
    error "Consumer on shutdown failed due to an error: #{error}"
    error details
  when 'consumer.tick.error'
    error "Consumer on tick failed due to an error: #{error}"
    error details
  when 'consumer.eofed.error'
    error "Consumer on eofed failed due to an error: #{error}"
    error details
  when 'consumer.after_consume.error'
    error "Consumer on after_consume failed due to an error: #{error}"
    error details
  when 'worker.process.error'
    fatal "Worker processing failed due to an error: #{error}"
    fatal details
  when 'connection.listener.fetch_loop.error'
    error "Listener fetch loop error: #{error}"
    error details
  when 'swarm.supervisor.error'
    fatal "Swarm supervisor crashed due to an error: #{error}"
    fatal details
  when 'runner.call.error'
    fatal "Runner crashed due to an error: #{error}"
    fatal details
  when 'app.stopping.error'
    # Counts number of workers and listeners that were still active when forcing the
    # shutdown. Please note, that unless all listeners are closed, workers will not finalize
    # their operations as well.
    # We need to check if listeners and workers are assigned as during super early stages of
    # boot they are not.
    listeners = Server.listeners ? Server.listeners.count(&:active?) : 0
    workers = Server.workers ? Server.workers.count(&:alive?) : 0

    message = <<~MSG.tr("\n", ' ').strip!
      Forceful Karafka server stop with:
      #{workers} active workers and
      #{listeners} active listeners
    MSG

    error message
  when 'app.forceful_stopping.error'
    error "Forceful shutdown error occurred: #{error}"
    error details
  when 'librdkafka.error'
    error "librdkafka internal error occurred: #{error}"
    error details
  # Those can occur when emitted statistics are consumed by the end user and the processing
  # of statistics fails. The statistics are emitted from librdkafka main loop thread and
  # any errors there crash the whole thread
  when 'callbacks.statistics.error'
    error "callbacks.statistics processing failed due to an error: #{error}"
    error details
  when 'callbacks.error.error'
    error "callbacks.error processing failed due to an error: #{error}"
    error details
  # Those will only occur when retries in the client fail and when they did not stop after
  # back-offs
  when 'connection.client.poll.error'
    error "Data polling error occurred: #{error}"
    error details
  when 'connection.client.rebalance_callback.error'
    error "Rebalance callback error occurred: #{error}"
    error details
  when 'connection.client.unsubscribe.error'
    error "Client unsubscribe error occurred: #{error}"
    error details
  when 'parallel_segments.reducer.error'
    error "Parallel segments reducer error occurred: #{error}"
    error details
  when 'parallel_segments.partitioner.error'
    error "Parallel segments partitioner error occurred: #{error}"
    error details
  when 'virtual_partitions.partitioner.error'
    error "Virtual partitions partitioner error occurred: #{error}"
    error details
  # This handles any custom errors coming from places like Web-UI, etc
  else
    error "#{type} error occurred: #{error.class} - #{error}"
    error details
  end
end

#on_filtering_seek(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/karafka/instrumentation/logger_listener.rb', line 296

def on_filtering_seek(event)
  consumer = event[:caller]
  topic = consumer.topic.name
  # Message to which we seek
  message = event[:message]
  partition = message.partition
  offset = message.offset

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Post-filtering seeking to message #{offset}
    on #{topic}-#{partition}
  MSG
end

#on_filtering_throttled(event) ⇒ Object

Logs info about throttling event

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/karafka/instrumentation/logger_listener.rb', line 280

def on_filtering_throttled(event)
  consumer = event[:caller]
  topic = consumer.topic.name
  # Here we get last message before throttle
  message = event[:message]
  partition = message.partition
  offset = message.offset

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Throttled and will resume
    from message #{offset}
    on #{topic}-#{partition}
  MSG
end

#on_process_notice_signal(event) ⇒ Object

Logs info about system signals that Karafka received and prints backtrace for threads in case of ttin

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/karafka/instrumentation/logger_listener.rb', line 164

def on_process_notice_signal(event)
  server_id = Karafka::Server.id
  info "[#{server_id}] Received #{event[:signal]} system signal"

  # We print backtrace only for ttin
  return unless event[:signal] == :SIGTTIN

  # Inspired by Sidekiq
  Thread.list.each do |thread|
    tid = (thread.object_id ^ ::Process.pid).to_s(36)

    warn "Thread TID-#{tid} #{thread.name}"

    if thread.backtrace
      warn thread.backtrace.join("\n")
    else
      warn '<no backtrace available>'
    end
  end
end

#on_rebalance_partitions_assigned(event) ⇒ Object

Logs info about partitions that we’ve gained

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details with assigned partitions



244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/karafka/instrumentation/logger_listener.rb', line 244

def on_rebalance_partitions_assigned(event)
  assigned_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) }
  group_id = event[:consumer_group_id]
  client_id = event[:client_id]
  group_prefix = "[#{client_id}] Group #{group_id} rebalance"

  if assigned_partitions.empty?
    info "#{group_prefix}: No partitions assigned"
  else
    assigned_partitions.each do |topic, partitions|
      info "#{group_prefix}: #{topic}-[#{partitions.join(',')}] assigned"
    end
  end
end

#on_rebalance_partitions_revoked(event) ⇒ Object

Logs info about partitions we have lost

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details with revoked partitions



226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/karafka/instrumentation/logger_listener.rb', line 226

def on_rebalance_partitions_revoked(event)
  revoked_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) }
  group_id = event[:consumer_group_id]
  client_id = event[:client_id]
  group_prefix = "[#{client_id}] Group #{group_id} rebalance"

  if revoked_partitions.empty?
    info "#{group_prefix}: No partitions revoked"
  else
    revoked_partitions.each do |topic, partitions|
      info "#{group_prefix}: #{topic}-[#{partitions.join(',')}] revoked"
    end
  end
end

#on_swarm_manager_before_fork(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



323
324
325
# File 'lib/karafka/instrumentation/logger_listener.rb', line 323

def on_swarm_manager_before_fork(event)
  debug "Swarm manager starting node with id: #{event[:node].id}"
end

#on_swarm_manager_control(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



333
334
335
336
# File 'lib/karafka/instrumentation/logger_listener.rb', line 333

def on_swarm_manager_control(event)
  pids = event[:caller].nodes.map(&:pid).join(', ')
  debug "Swarm manager checking nodes: #{pids}"
end

#on_swarm_manager_stopping(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



311
312
313
314
# File 'lib/karafka/instrumentation/logger_listener.rb', line 311

def on_swarm_manager_stopping(event)
  node = event[:node]
  error "Swarm manager detected unhealthy node #{node.pid}. Sending TERM signal..."
end

#on_swarm_manager_terminating(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



317
318
319
320
# File 'lib/karafka/instrumentation/logger_listener.rb', line 317

def on_swarm_manager_terminating(event)
  node = event[:node]
  error "Swarm manager detected unresponsive node #{node.pid}. Sending KILL signal..."
end

#on_swarm_node_after_fork(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



328
329
330
# File 'lib/karafka/instrumentation/logger_listener.rb', line 328

def on_swarm_node_after_fork(_event)
  info "Swarm node #{::Process.pid} forked from #{::Process.ppid}"
end

#on_worker_process(event) ⇒ Object

Prints info about the fact that a given job has started

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



73
74
75
76
77
78
79
80
# File 'lib/karafka/instrumentation/logger_listener.rb', line 73

def on_worker_process(event)
  job = event[:job]
  job_type = job.class.to_s.split('::').last
  consumer = job.executor.topic.consumer
  topic = job.executor.topic.name
  partition = job.executor.partition
  info "[#{job.id}] #{job_type} job for #{consumer} on #{topic}-#{partition} started"
end

#on_worker_processed(event) ⇒ Object

Prints info about the fact that a given job has finished

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/karafka/instrumentation/logger_listener.rb', line 85

def on_worker_processed(event)
  job = event[:job]
  time = event[:time].round(2)
  job_type = job.class.to_s.split('::').last
  consumer = job.executor.topic.consumer
  topic = job.executor.topic.name
  partition = job.executor.partition
  info <<~MSG.tr("\n", ' ').strip!
    [#{job.id}] #{job_type} job for #{consumer}
    on #{topic}-#{partition} finished in #{time} ms
  MSG
end