Class: WaterDrop::Instrumentation::Callbacks::Delivery
- Inherits:
-
Object
- Object
- WaterDrop::Instrumentation::Callbacks::Delivery
- Defined in:
- lib/waterdrop/instrumentation/callbacks/delivery.rb
Overview
Note:
We don’t have to provide client_name here as this callback is per client instance
Note:
We do not consider message.purge
as an error for transactional producers, because this is a standard behaviour for not yet dispatched messages on aborted transactions. We do however still want to instrument it for traceability.
Creates a callable that we want to run upon each message delivery or failure
Instance Method Summary collapse
-
#call(delivery_report) ⇒ Object
Emits delivery details to the monitor.
-
#initialize(producer_id, transactional, monitor) ⇒ Delivery
constructor
A new instance of Delivery.
Constructor Details
#initialize(producer_id, transactional, monitor) ⇒ Delivery
Returns a new instance of Delivery.
28 29 30 31 32 |
# File 'lib/waterdrop/instrumentation/callbacks/delivery.rb', line 28 def initialize(producer_id, transactional, monitor) @producer_id = producer_id @transactional = transactional @monitor = monitor end |
Instance Method Details
#call(delivery_report) ⇒ Object
Emits delivery details to the monitor
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/waterdrop/instrumentation/callbacks/delivery.rb', line 36 def call(delivery_report) error_code = delivery_report.error.to_i if error_code.zero? instrument_acknowledged(delivery_report) elsif @transactional && PURGE_ERRORS.include?(error_code) instrument_purged(delivery_report) else instrument_error(delivery_report) end # This runs from the rdkafka thread, thus we want to safe-guard it and prevent absolute # crashes even if the instrumentation code fails. If it would bubble-up, it could crash # the rdkafka background thread rescue StandardError => e @monitor.instrument( 'error.occurred', caller: self, error: e, producer_id: @producer_id, type: 'callbacks.delivery.error' ) end |