Class: WaterDrop::Instrumentation::Callbacks::Delivery

Inherits:
Object
  • Object
show all
Defined in:
lib/waterdrop/instrumentation/callbacks/delivery.rb

Overview

Note:

We don’t have to provide client_name here as this callback is per client instance

Note:

We do not consider message.purge as an error for transactional producers, because this is a standard behaviour for not yet dispatched messages on aborted transactions. We do however still want to instrument it for traceability.

Creates a callable that we want to run upon each message delivery or failure

Instance Method Summary collapse

Constructor Details

#initialize(producer_id, transactional, monitor) ⇒ Delivery

Returns a new instance of Delivery.

Parameters:

  • producer_id (String)

    id of the current producer

  • transactional (Boolean)

    is this handle for a transactional or regular producer

  • monitor (WaterDrop::Instrumentation::Monitor)

    monitor we are using



28
29
30
31
32
# File 'lib/waterdrop/instrumentation/callbacks/delivery.rb', line 28

def initialize(producer_id, transactional, monitor)
  @producer_id = producer_id
  @transactional = transactional
  @monitor = monitor
end

Instance Method Details

#call(delivery_report) ⇒ Object

Emits delivery details to the monitor

Parameters:

  • delivery_report (Rdkafka::Producer::DeliveryReport)

    delivery report



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/waterdrop/instrumentation/callbacks/delivery.rb', line 36

def call(delivery_report)
  error_code = delivery_report.error.to_i

  if error_code.zero?
    instrument_acknowledged(delivery_report)

  elsif @transactional && PURGE_ERRORS.include?(error_code)
    instrument_purged(delivery_report)
  else
    instrument_error(delivery_report)
  end
# This runs from the rdkafka thread, thus we want to safe-guard it and prevent absolute
# crashes even if the instrumentation code fails. If it would bubble-up, it could crash
# the rdkafka background thread
rescue StandardError => e
  @monitor.instrument(
    'error.occurred',
    caller: self,
    error: e,
    producer_id: @producer_id,
    type: 'callbacks.delivery.error'
  )
end