Module: WaterDrop::Producer::Transactions
- Included in:
- WaterDrop::Producer
- Defined in:
- lib/waterdrop/producer/transactions.rb
Overview
Transactions related producer functionalities
Instance Method Summary collapse
-
#transaction ⇒ Object
Creates a transaction.
-
#transaction? ⇒ Boolean
True if we are in an active transaction.
-
#transaction_mark_as_consumed(consumer, message, offset_metadata = nil) ⇒ Object
Marks given message as consumed inside of a transaction.
-
#transactional? ⇒ Boolean
Is this producer a transactional one.
Instance Method Details
#transaction ⇒ Object
Creates a transaction.
Karafka transactions work in a similar manner to SQL db transactions though there are some crucial differences. When you start a transaction, all messages produced during it will be delivered together or will fail together. The difference is, that messages from within a single transaction can be delivered and will have a delivery handle but will be then compacted prior to moving the LSO forward. This means, that not every delivery handle for async dispatches will emit a queue purge error. None for sync as the delivery has happened but they will never be visible by the transactional consumers.
Transactions are thread-safe however they lock a mutex. This means, that for high-throughput transactional messages production in multiple threads (for example in Karafka), it may be much better to use few instances that can work in parallel.
Please note, that if a producer is configured as transactional, it cannot produce messages outside of transactions, that is why by default all dispatches will be wrapped with a transaction. One transaction per single dispatch and for produce_many
it will be a single transaction wrapping all messages dispatches (not one per message).
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/waterdrop/producer/transactions.rb', line 57 def transaction # This will safely allow us to support one operation transactions so a transactional # producer can work without the transactional block if needed return yield if @transaction_mutex.owned? @transaction_mutex.synchronize do transactional_instrument(:finished) do with_transactional_error_handling(:begin) do transactional_instrument(:started) { client.begin_transaction } end result = nil finished = false begin result = yield finished = true rescue Exception => e raise(e) ensure if !e && !finished raise( Errors::EarlyTransactionExitNotAllowedError, <<~ERROR_MSG.tr("\n", ' ') Using `return`, `break` or `throw` to exit a transaction block is not allowed. If the `throw` came from `Timeout.timeout(duration)`, pass an exception class as a second argument so it doesn't use `throw` to abort its block. ERROR_MSG ) end end with_transactional_error_handling(:commit) do transactional_instrument(:committed) { client.commit_transaction } end result # We need to handle any interrupt including critical in order not to have the transaction # running. This will also handle things like `IRB::Abort` rescue Exception => e # This code is a bit tricky. We have an error and when it happens we try to rollback # the transaction. However we may end up in a state where transaction aborting itself # produces error. In such case we also want to handle it as fatal and reload client. # This is why we catch this here begin with_transactional_error_handling(:abort) do transactional_instrument(:aborted) do client.abort_transaction end end rescue StandardError => e # If something from rdkafka leaks here, it means there was a non-retryable error that # bubbled up. In such cases if we should, we do reload the underling client transactional_reload_client_if_needed(e) raise end transactional_reload_client_if_needed(e) raise unless e.is_a?(WaterDrop::Errors::AbortTransaction) end end end |
#transaction? ⇒ Boolean
Returns true if we are in an active transaction.
123 124 125 |
# File 'lib/waterdrop/producer/transactions.rb', line 123 def transaction? @transaction_mutex.owned? end |
#transaction_mark_as_consumed(consumer, message, offset_metadata = nil) ⇒ Object
Marks given message as consumed inside of a transaction.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/waterdrop/producer/transactions.rb', line 140 def transaction_mark_as_consumed(consumer, , = nil) raise Errors::TransactionRequiredError unless @transaction_mutex.owned? CONTRACT.validate!( { consumer: consumer, message: , offset_metadata: }, Errors::TransactionalOffsetInvalidError ) details = { message: , offset_metadata: } transactional_instrument(:marked_as_consumed, details) do tpl = Rdkafka::Consumer::TopicPartitionList.new partition = Rdkafka::Consumer::Partition.new( .partition, # +1 because this is next offset from which we will start processing from .offset + 1, 0, ) tpl.add_topic_and_partitions_with_offsets(.topic, [partition]) with_transactional_error_handling(:store_offset) do client.send_offsets_to_transaction( consumer, tpl, current_variant.max_wait_timeout ) end end end |
#transactional? ⇒ Boolean
Returns Is this producer a transactional one.
128 129 130 131 132 |
# File 'lib/waterdrop/producer/transactions.rb', line 128 def transactional? return @transactional if instance_variable_defined?(:'@transactional') @transactional = config.kafka.to_h.key?(:'transactional.id') end |