Module: WaterDrop::Producer::Sync
- Included in:
- WaterDrop::Producer
- Defined in:
- lib/waterdrop/producer/sync.rb
Overview
Component for synchronous producer operations
Instance Method Summary collapse
-
#produce_many_sync(messages) ⇒ Array<Rdkafka::Producer::DeliveryReport>
Produces many messages to Kafka and waits for them to be delivered.
-
#produce_sync(message) ⇒ Rdkafka::Producer::DeliveryReport
Produces a message to Kafka and waits for it to be delivered.
Instance Method Details
#produce_many_sync(messages) ⇒ Array<Rdkafka::Producer::DeliveryReport>
Produces many messages to Kafka and waits for them to be delivered
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/waterdrop/producer/sync.rb', line 59 def produce_many_sync() = middleware.run_many() .each { || () } dispatched = [] inline_error = nil @monitor.instrument('messages.produced_sync', producer_id: id, messages: ) do # While most of the librdkafka errors are async and not inline, there are some like # buffer overflow that can leak in during the `#produce` itself. When this happens, we # still (since it's a sync mode) need to wait on deliveries of things that were # successfully delegated to librdkafka. To do this, we catch the inline error and we # wait on messages that were in the buffer to reach final state. Then if no error, we # check each for error and if none all good. If there was an inline error, we re-raise # it with the handlers in final states. # # Such flow ensures, that we never end up with handlers not being in the final states # for the sync flow begin with_transaction_if_transactional do .each do || dispatched << produce() end end rescue *SUPPORTED_FLOW_ERRORS => e inline_error = e end # This will ensure, that we have all verdicts before raising the failure, so we pass # all delivery handles having a final verdict dispatched.each { |handler| wait(handler, raise_response_error: false) } raise(inline_error) if inline_error # This will raise an error on the first error that have happened dispatched.each { |handler| wait(handler) } dispatched end rescue *SUPPORTED_FLOW_ERRORS => e re_raised = Errors::ProduceManyError.new(dispatched, e.inspect) @monitor.instrument( 'error.occurred', producer_id: id, messages: , # If it is a transactional producer nothing was successfully dispatched on error, thus # we never return any dispatched handlers. While those messages might have reached # Kafka, in transactional mode they will not be visible to consumers with correct # isolation level. dispatched: transactional? ? EMPTY_ARRAY : dispatched, error: re_raised, type: 'messages.produce_many_sync' ) raise re_raised end |
#produce_sync(message) ⇒ Rdkafka::Producer::DeliveryReport
Produces a message to Kafka and waits for it to be delivered
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/waterdrop/producer/sync.rb', line 18 def produce_sync() = middleware.run() () @monitor.instrument( 'message.produced_sync', producer_id: id, message: ) do wait(produce()) end rescue *SUPPORTED_FLOW_ERRORS => e # We use this syntax here because we want to preserve the original `#cause` when we # instrument the error and there is no way to manually assign `#cause` value begin raise Errors::ProduceError, e.inspect rescue Errors::ProduceError => ex @monitor.instrument( 'error.occurred', producer_id: id, message: , error: ex, type: 'message.produce_sync' ) raise ex end end |