Class: WaterDrop::Instrumentation::Callbacks::Delivery

Inherits:
Object
  • Object
show all
Defined in:
lib/waterdrop/instrumentation/callbacks/delivery.rb

Overview

Note:

We don’t have to provide client_name here as this callback is per client instance

Note:

We do not consider message.purge as an error for transactional producers, because this is a standard behaviour for not yet dispatched messages on aborted transactions. We do however still want to instrument it for traceability.

Creates a callable that we want to run upon each message delivery or failure

Instance Method Summary collapse

Constructor Details

#initialize(producer_id, transactional, monitor) ⇒ Delivery

Returns a new instance of Delivery.

Parameters:

  • producer_id (String)

    id of the current producer

  • transactional (Boolean)

    is this handle for a transactional or regular producer

  • monitor (WaterDrop::Instrumentation::Monitor)

    monitor we are using



28
29
30
31
32
# File 'lib/waterdrop/instrumentation/callbacks/delivery.rb', line 28

def initialize(producer_id, transactional, monitor)
  @producer_id = producer_id
  @transactional = transactional
  @monitor = monitor
end

Instance Method Details

#call(delivery_report) ⇒ Object

Emits delivery details to the monitor

Parameters:

  • delivery_report (Rdkafka::Producer::DeliveryReport)

    delivery report



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/waterdrop/instrumentation/callbacks/delivery.rb', line 36

def call(delivery_report)
  error_code = delivery_report.error.to_i

  if error_code.zero?
    instrument_acknowledged(delivery_report)

  elsif @transactional && PURGE_ERRORS.include?(error_code)
    instrument_purged(delivery_report)
  else
    instrument_error(delivery_report)
  end
end