Class: WaterDrop::Instrumentation::LoggerListener

Inherits:
Object
  • Object
show all
Defined in:
lib/waterdrop/instrumentation/logger_listener.rb

Overview

Note:

It is a module as we can use it then as a part of the Karafka framework listener as well as we can use it standalone

Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Waterdrop flow

Instance Method Summary collapse

Constructor Details

#initialize(logger, log_messages: true) ⇒ LoggerListener

Returns a new instance of LoggerListener.

Parameters:

  • logger (Object)

    logger we want to use

  • log_messages (Boolean) (defaults to: true)

    Should we report the messages content (payload and metadata) with each message operation.

    This can be extensive, especially when producing a lot of messages. We provide this despite the fact that we only report payloads in debug, because Rails by default operates with debug level. This means, that when working with Rails in development, every single payload dispatched will go to logs. In majority of the cases this is extensive and simply floods the end user.



20
21
22
23
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 20

def initialize(logger, log_messages: true)
  @logger = logger
  @log_messages = log_messages
end

Instance Method Details

#on_buffer_flushed_async(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



97
98
99
100
101
102
103
104
105
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 97

def on_buffer_flushed_async(event)
  messages = event[:messages]

  info(event, "Async flushing of #{messages.size} messages from the buffer")

  return unless log_messages?

  debug(event, messages)
end

#on_buffer_flushed_sync(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



108
109
110
111
112
113
114
115
116
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 108

def on_buffer_flushed_sync(event)
  messages = event[:messages]

  info(event, "Sync flushing of #{messages.size} messages from the buffer")

  return unless log_messages?

  debug(event, messages)
end

#on_buffer_purged(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



119
120
121
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 119

def on_buffer_purged(event)
  info(event, 'Successfully purging buffer')
end

#on_error_occurred(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the error details



141
142
143
144
145
146
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 141

def on_error_occurred(event)
  error = event[:error]
  type = event[:type]

  error(event, "Error occurred: #{error} - #{type}")
end

#on_message_buffered(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



75
76
77
78
79
80
81
82
83
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 75

def on_message_buffered(event)
  message = event[:message]

  info(event, "Buffering of a message to '#{message[:topic]}' topic")

  return unless log_messages?

  debug(event, [message])
end

#on_message_produced_async(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



26
27
28
29
30
31
32
33
34
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 26

def on_message_produced_async(event)
  message = event[:message]

  info(event, "Message to '#{message[:topic]}' topic was delegated to a dispatch queue")

  return unless log_messages?

  debug(event, message)
end

#on_message_produced_sync(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



37
38
39
40
41
42
43
44
45
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 37

def on_message_produced_sync(event)
  message = event[:message]

  info(event, "Sync producing of a message to '#{message[:topic]}' topic")

  return unless log_messages?

  debug(event, message)
end

#on_messages_buffered(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



86
87
88
89
90
91
92
93
94
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 86

def on_messages_buffered(event)
  messages = event[:messages]

  info(event, "Buffering of #{messages.size} messages")

  return unless log_messages?

  debug(event, [messages, messages.size])
end

#on_messages_produced_async(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 48

def on_messages_produced_async(event)
  messages = event[:messages]
  topics_count = messages.map { |message| "'#{message[:topic]}'" }.uniq.count

  info(
    event,
    "#{messages.size} messages to #{topics_count} topics were delegated to a dispatch queue"
  )

  return unless log_messages?

  debug(event, messages)
end

#on_messages_produced_sync(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



63
64
65
66
67
68
69
70
71
72
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 63

def on_messages_produced_sync(event)
  messages = event[:messages]
  topics_count = messages.map { |message| "'#{message[:topic]}'" }.uniq.count

  info(event, "Sync producing of #{messages.size} messages to #{topics_count} topics")

  return unless log_messages?

  debug(event, messages)
end

#on_producer_closed(event) ⇒ Object

Note:

While this says “Closing producer”, it produces a nice message with time taken: “Closing producer took 12 ms” indicating it happened in the past.

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



131
132
133
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 131

def on_producer_closed(event)
  info(event, 'Closing producer')
end

#on_producer_closing(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



124
125
126
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 124

def on_producer_closing(event)
  info(event, 'Closing producer')
end

#on_producer_reloaded(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



136
137
138
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 136

def on_producer_reloaded(event)
  info(event, 'Producer successfully reloaded')
end

#on_transaction_aborted(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



154
155
156
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 154

def on_transaction_aborted(event)
  info(event, 'Aborting transaction')
end

#on_transaction_committed(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



159
160
161
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 159

def on_transaction_committed(event)
  info(event, 'Committing transaction')
end

#on_transaction_finished(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



178
179
180
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 178

def on_transaction_finished(event)
  info(event, 'Processing transaction')
end

#on_transaction_marked_as_consumed(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 164

def on_transaction_marked_as_consumed(event)
  message = event[:message]
  topic = message.topic
  partition = message.partition
  offset = message.offset
  loc = "#{topic}/#{partition}"

  info(
    event,
    "Marking message with offset #{offset} for topic #{loc} as consumed in a transaction"
  )
end

#on_transaction_started(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



149
150
151
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 149

def on_transaction_started(event)
  info(event, 'Starting transaction')
end