Class: WaterDrop::Instrumentation::LoggerListener

Inherits:
Object
  • Object
show all
Defined in:
lib/waterdrop/instrumentation/logger_listener.rb

Overview

Note:

It is a module as we can use it then as a part of the Karafka framework listener as well as we can use it standalone

Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Waterdrop flow

Instance Method Summary collapse

Constructor Details

#initialize(logger, log_messages: true) ⇒ LoggerListener

Returns a new instance of LoggerListener.

Parameters:

  • logger (Object)

    logger we want to use

  • log_messages (Boolean) (defaults to: true)

    Should we report the messages content (payload and metadata) with each message operation.

    This can be extensive, especially when producing a lot of messages. We provide this despite the fact that we only report payloads in debug, because Rails by default operates with debug level. This means, that when working with Rails in development, every single payload dispatched will go to logs. In majority of the cases this is extensive and simply floods the end user.



20
21
22
23
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 20

def initialize(logger, log_messages: true)
  @logger = logger
  @log_messages = log_messages
end

Instance Method Details

#on_buffer_flushed_async(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



97
98
99
100
101
102
103
104
105
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 97

def on_buffer_flushed_async(event)
  messages = event[:messages]

  info(event, "Async flushing of #{messages.size} messages from the buffer")

  return unless log_messages?

  debug(event, messages)
end

#on_buffer_flushed_sync(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



108
109
110
111
112
113
114
115
116
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 108

def on_buffer_flushed_sync(event)
  messages = event[:messages]

  info(event, "Sync flushing of #{messages.size} messages from the buffer")

  return unless log_messages?

  debug(event, messages)
end

#on_buffer_purged(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



119
120
121
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 119

def on_buffer_purged(event)
  info(event, 'Successfully purging buffer')
end

#on_error_occurred(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the error details



153
154
155
156
157
158
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 153

def on_error_occurred(event)
  error = event[:error]
  type = event[:type]

  error(event, "Error occurred: #{error} - #{type}")
end

#on_message_buffered(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



75
76
77
78
79
80
81
82
83
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 75

def on_message_buffered(event)
  message = event[:message]

  info(event, "Buffering of a message to '#{message[:topic]}' topic")

  return unless log_messages?

  debug(event, [message])
end

#on_message_produced_async(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



26
27
28
29
30
31
32
33
34
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 26

def on_message_produced_async(event)
  message = event[:message]

  info(event, "Message to '#{message[:topic]}' topic was delegated to a dispatch queue")

  return unless log_messages?

  debug(event, message)
end

#on_message_produced_sync(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



37
38
39
40
41
42
43
44
45
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 37

def on_message_produced_sync(event)
  message = event[:message]

  info(event, "Sync producing of a message to '#{message[:topic]}' topic")

  return unless log_messages?

  debug(event, message)
end

#on_messages_buffered(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



86
87
88
89
90
91
92
93
94
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 86

def on_messages_buffered(event)
  messages = event[:messages]

  info(event, "Buffering of #{messages.size} messages")

  return unless log_messages?

  debug(event, [messages, messages.size])
end

#on_messages_produced_async(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 48

def on_messages_produced_async(event)
  messages = event[:messages]
  topics_count = messages.map { |message| "'#{message[:topic]}'" }.uniq.count

  info(
    event,
    "#{messages.size} messages to #{topics_count} topics were delegated to a dispatch queue"
  )

  return unless log_messages?

  debug(event, messages)
end

#on_messages_produced_sync(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



63
64
65
66
67
68
69
70
71
72
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 63

def on_messages_produced_sync(event)
  messages = event[:messages]
  topics_count = messages.map { |message| "'#{message[:topic]}'" }.uniq.count

  info(event, "Sync producing of #{messages.size} messages to #{topics_count} topics")

  return unless log_messages?

  debug(event, messages)
end

#on_producer_closed(event) ⇒ Object

Note:

While this says “Closing producer”, it produces a nice message with time taken: “Closing producer took 12 ms” indicating it happened in the past.

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



131
132
133
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 131

def on_producer_closed(event)
  info(event, 'Closing producer')
end

#on_producer_closing(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



124
125
126
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 124

def on_producer_closing(event)
  info(event, 'Closing producer')
end

#on_producer_disconnected(event) ⇒ Object

Note:

While this says “Disconnecting producer”, it produces a nice message with time taken: “Disconnecting producer took 5 ms” indicating it happened in the past.

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



143
144
145
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 143

def on_producer_disconnected(event)
  info(event, 'Disconnected producer')
end

#on_producer_disconnecting(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



136
137
138
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 136

def on_producer_disconnecting(event)
  info(event, 'Disconnecting producer')
end

#on_producer_reloaded(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



148
149
150
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 148

def on_producer_reloaded(event)
  info(event, 'Producer successfully reloaded')
end

#on_transaction_aborted(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



166
167
168
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 166

def on_transaction_aborted(event)
  info(event, 'Aborting transaction')
end

#on_transaction_committed(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



171
172
173
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 171

def on_transaction_committed(event)
  info(event, 'Committing transaction')
end

#on_transaction_finished(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



190
191
192
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 190

def on_transaction_finished(event)
  info(event, 'Processing transaction')
end

#on_transaction_marked_as_consumed(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 176

def on_transaction_marked_as_consumed(event)
  message = event[:message]
  topic = message.topic
  partition = message.partition
  offset = message.offset
  loc = "#{topic}-#{partition}"

  info(
    event,
    "Marking message with offset #{offset} on #{loc} as consumed in a transaction"
  )
end

#on_transaction_started(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



161
162
163
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 161

def on_transaction_started(event)
  info(event, 'Starting transaction')
end