Module: WaterDrop::Producer::Transactions
- Included in:
- WaterDrop::Producer
- Defined in:
- lib/waterdrop/producer/transactions.rb
Overview
Transactions related producer functionalities
Instance Method Summary collapse
-
#transaction ⇒ Object
Creates a transaction.
-
#transaction? ⇒ Boolean
True if we are in an active transaction.
-
#transaction_mark_as_consumed(consumer, message, offset_metadata = nil) ⇒ Object
Marks given message as consumed inside of a transaction.
-
#transactional? ⇒ Boolean
Is this producer a transactional one.
-
#transactional_retryable? ⇒ Boolean
Checks if we can still retry reloading after a transactional fatal error.
Instance Method Details
#transaction ⇒ Object
Creates a transaction.
Karafka transactions work in a similar manner to SQL db transactions though there are some crucial differences. When you start a transaction, all messages produced during it will be delivered together or will fail together. The difference is, that messages from within a single transaction can be delivered and will have a delivery handle but will be then compacted prior to moving the LSO forward. This means, that not every delivery handle for async dispatches will emit a queue purge error. None for sync as the delivery has happened but they will never be visible by the transactional consumers.
Transactions are thread-safe however they lock a mutex. This means, that for high-throughput transactional messages production in multiple threads (for example in Karafka), it may be much better to use few instances that can work in parallel.
Please note, that if a producer is configured as transactional, it cannot produce
messages outside of transactions, that is why by default all dispatches will be wrapped
with a transaction. One transaction per single dispatch and for produce_many it will be
a single transaction wrapping all messages dispatches (not one per message).
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/waterdrop/producer/transactions.rb', line 51 def transaction unless transactional? raise( Errors::ProducerNotTransactionalError, "#{id} is not transactional" ) end # This will safely allow us to support one operation transactions so a transactional # producer can work without the transactional block if needed return yield if @transaction_mutex.owned? @transaction_mutex.synchronize do ensure_active! transactional_instrument(:finished) do with_transactional_error_handling(:begin) do transactional_instrument(:started) { client.begin_transaction } end result = nil finished = false begin result = yield finished = true rescue Exception => e raise(e) ensure if !e && !finished raise( Errors::EarlyTransactionExitNotAllowedError, <<~ERROR_MSG.tr("\n", ' ') Using `return`, `break` or `throw` to exit a transaction block is not allowed. If the `throw` came from `Timeout.timeout(duration)`, pass an exception class as a second argument so it doesn't use `throw` to abort its block. ERROR_MSG ) end end with_transactional_error_handling(:commit) do transactional_instrument(:committed) { client.commit_transaction } end # Reset attempts counter on successful transaction commit @transaction_fatal_error_attempts = 0 result # We need to handle any interrupt including critical in order not to have the transaction # running. This will also handle things like `IRB::Abort` rescue Exception => e # This code is a bit tricky. We have an error and when it happens we try to rollback # the transaction. However we may end up in a state where transaction aborting itself # produces error. In such case we also want to handle it as fatal and reload client. # This is why we catch this here begin with_transactional_error_handling(:abort) do transactional_instrument(:aborted) do client.abort_transaction end end rescue StandardError => e # If something from rdkafka leaks here, it means there was a non-retryable error that # bubbled up. In such cases if we should, we do reload the underling client transactional_reload_client_if_needed(e) raise end transactional_reload_client_if_needed(e) raise unless e.is_a?(WaterDrop::Errors::AbortTransaction) end end end |
#transaction? ⇒ Boolean
Returns true if we are in an active transaction.
129 130 131 |
# File 'lib/waterdrop/producer/transactions.rb', line 129 def transaction? @transaction_mutex.owned? end |
#transaction_mark_as_consumed(consumer, message, offset_metadata = nil) ⇒ Object
Marks given message as consumed inside of a transaction.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/waterdrop/producer/transactions.rb', line 153 def transaction_mark_as_consumed(consumer, , = nil) raise Errors::TransactionRequiredError unless @transaction_mutex.owned? CONTRACT.validate!( { consumer: consumer, message: , offset_metadata: }, Errors::TransactionalOffsetInvalidError ) details = { message: , offset_metadata: } transactional_instrument(:marked_as_consumed, details) do tpl = Rdkafka::Consumer::TopicPartitionList.new partition = Rdkafka::Consumer::Partition.new( .partition, # +1 because this is next offset from which we will start processing from .offset + 1, 0, ) tpl.add_topic_and_partitions_with_offsets(.topic, [partition]) with_transactional_error_handling(:store_offset) do client.send_offsets_to_transaction( consumer, tpl, current_variant.max_wait_timeout ) end end end |
#transactional? ⇒ Boolean
Returns Is this producer a transactional one.
134 135 136 137 138 |
# File 'lib/waterdrop/producer/transactions.rb', line 134 def transactional? return @transactional unless @transactional.nil? @transactional = config.kafka.to_h.key?(:'transactional.id') end |
#transactional_retryable? ⇒ Boolean
Checks if we can still retry reloading after a transactional fatal error
143 144 145 |
# File 'lib/waterdrop/producer/transactions.rb', line 143 def transactional_retryable? @transaction_fatal_error_attempts < config.max_attempts_on_transaction_fatal_error end |