Class: WaterDrop::Instrumentation::LoggerListener

Inherits:
Object
  • Object
show all
Defined in:
lib/waterdrop/instrumentation/logger_listener.rb

Overview

Note:

It is a module as we can use it then as a part of the Karafka framework listener as well as we can use it standalone

Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Waterdrop flow

Instance Method Summary collapse

Constructor Details

#initialize(logger, log_messages: true) ⇒ LoggerListener

Returns a new instance of LoggerListener.

Parameters:

  • logger (Object)

    logger we want to use

  • log_messages (Boolean) (defaults to: true)

    Should we report the messages content (payload and metadata) with each message operation.

    This can be extensive, especially when producing a lot of messages. We provide this despite the fact that we only report payloads in debug, because Rails by default operates with debug level. This means, that when working with Rails in development, every single payload dispatched will go to logs. In majority of the cases this is extensive and simply floods the end user.



20
21
22
23
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 20

def initialize(logger, log_messages: true)
  @logger = logger
  @log_messages = log_messages
end

Instance Method Details

#on_buffer_flushed_async(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



94
95
96
97
98
99
100
101
102
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 94

def on_buffer_flushed_async(event)
  messages = event[:messages]

  info(event, "Async flushing of #{messages.size} messages from the buffer")

  return unless log_messages?

  debug(event, messages)
end

#on_buffer_flushed_sync(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



105
106
107
108
109
110
111
112
113
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 105

def on_buffer_flushed_sync(event)
  messages = event[:messages]

  info(event, "Sync flushing of #{messages.size} messages from the buffer")

  return unless log_messages?

  debug(event, messages)
end

#on_buffer_purged(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



116
117
118
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 116

def on_buffer_purged(event)
  info(event, 'Successfully purging buffer')
end

#on_error_occurred(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the error details



126
127
128
129
130
131
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 126

def on_error_occurred(event)
  error = event[:error]
  type = event[:type]

  error(event, "Error occurred: #{error} - #{type}")
end

#on_message_buffered(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



72
73
74
75
76
77
78
79
80
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 72

def on_message_buffered(event)
  message = event[:message]

  info(event, "Buffering of a message to '#{message[:topic]}' topic")

  return unless log_messages?

  debug(event, [message])
end

#on_message_produced_async(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



26
27
28
29
30
31
32
33
34
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 26

def on_message_produced_async(event)
  message = event[:message]

  info(event, "Async producing of a message to '#{message[:topic]}' topic")

  return unless log_messages?

  debug(event, message)
end

#on_message_produced_sync(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



37
38
39
40
41
42
43
44
45
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 37

def on_message_produced_sync(event)
  message = event[:message]

  info(event, "Sync producing of a message to '#{message[:topic]}' topic")

  return unless log_messages?

  debug(event, message)
end

#on_messages_buffered(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



83
84
85
86
87
88
89
90
91
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 83

def on_messages_buffered(event)
  messages = event[:messages]

  info(event, "Buffering of #{messages.size} messages")

  return unless log_messages?

  debug(event, [messages, messages.size])
end

#on_messages_produced_async(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



48
49
50
51
52
53
54
55
56
57
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 48

def on_messages_produced_async(event)
  messages = event[:messages]
  topics_count = messages.map { |message| "'#{message[:topic]}'" }.uniq.count

  info(event, "Async producing of #{messages.size} messages to #{topics_count} topics")

  return unless log_messages?

  debug(event, messages)
end

#on_messages_produced_sync(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



60
61
62
63
64
65
66
67
68
69
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 60

def on_messages_produced_sync(event)
  messages = event[:messages]
  topics_count = messages.map { |message| "'#{message[:topic]}'" }.uniq.count

  info(event, "Sync producing of #{messages.size} messages to #{topics_count} topics")

  return unless log_messages?

  debug(event, messages)
end

#on_producer_closed(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



121
122
123
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 121

def on_producer_closed(event)
  info(event, 'Closing producer')
end

#on_transaction_aborted(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



139
140
141
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 139

def on_transaction_aborted(event)
  info(event, 'Aborting transaction')
end

#on_transaction_committed(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



144
145
146
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 144

def on_transaction_committed(event)
  info(event, 'Committing transaction')
end

#on_transaction_finished(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



163
164
165
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 163

def on_transaction_finished(event)
  info(event, 'Processing transaction')
end

#on_transaction_marked_as_consumed(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 149

def on_transaction_marked_as_consumed(event)
  message = event[:message]
  topic = message.topic
  partition = message.partition
  offset = message.offset
  loc = "#{topic}/#{partition}"

  info(
    event,
    "Marking message with offset #{offset} for topic #{loc} as consumed in a transaction"
  )
end

#on_transaction_started(event) ⇒ Object

Parameters:

  • event (Dry::Events::Event)

    event that happened with the details



134
135
136
# File 'lib/waterdrop/instrumentation/logger_listener.rb', line 134

def on_transaction_started(event)
  info(event, 'Starting transaction')
end