Module: WaterDrop::Producer::Transactions
- Included in:
- WaterDrop::Producer
- Defined in:
- lib/waterdrop/producer/transactions.rb
Overview
Transactions related producer functionalities
Instance Method Summary collapse
-
#transaction ⇒ Object
Creates a transaction.
-
#transaction? ⇒ Boolean
True if we are in an active transaction.
-
#transaction_mark_as_consumed(consumer, message, offset_metadata = nil) ⇒ Object
Marks given message as consumed inside of a transaction.
-
#transactional? ⇒ Boolean
Is this producer a transactional one.
Instance Method Details
#transaction ⇒ Object
Creates a transaction.
Karafka transactions work in a similar manner to SQL db transactions though there are some crucial differences. When you start a transaction, all messages produced during it will be delivered together or will fail together. The difference is, that messages from within a single transaction can be delivered and will have a delivery handle but will be then compacted prior to moving the LSO forward. This means, that not every delivery handle for async dispatches will emit a queue purge error. None for sync as the delivery has happened but they will never be visible by the transactional consumers.
Transactions are thread-safe however they lock a mutex. This means, that for high-throughput transactional messages production in multiple threads (for example in Karafka), it may be much better to use few instances that can work in parallel.
Please note, that if a producer is configured as transactional, it cannot produce messages outside of transactions, that is why by default all dispatches will be wrapped with a transaction. One transaction per single dispatch and for produce_many
it will be a single transaction wrapping all messages dispatches (not one per message).
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/waterdrop/producer/transactions.rb', line 51 def transaction # This will safely allow us to support one operation transactions so a transactional # producer can work without the transactional block if needed return yield if @transaction_mutex.owned? @transaction_mutex.synchronize do transactional_instrument(:finished) do with_transactional_error_handling(:begin) do transactional_instrument(:started) { client.begin_transaction } end result = nil commit = false catch(:abort) do result = yield commit = true end commit || raise(WaterDrop::Errors::AbortTransaction) with_transactional_error_handling(:commit) do transactional_instrument(:committed) { client.commit_transaction } end result # We need to handle any interrupt including critical in order not to have the transaction # running. This will also handle things like `IRB::Abort` # # rubocop:disable Lint/RescueException rescue Exception => e # rubocop:enable Lint/RescueException with_transactional_error_handling(:abort) do transactional_instrument(:aborted) { client.abort_transaction } end raise unless e.is_a?(WaterDrop::Errors::AbortTransaction) end end end |
#transaction? ⇒ Boolean
Returns true if we are in an active transaction.
93 94 95 |
# File 'lib/waterdrop/producer/transactions.rb', line 93 def transaction? @transaction_mutex.owned? end |
#transaction_mark_as_consumed(consumer, message, offset_metadata = nil) ⇒ Object
Marks given message as consumed inside of a transaction.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/waterdrop/producer/transactions.rb', line 110 def transaction_mark_as_consumed(consumer, , = nil) raise Errors::TransactionRequiredError unless @transaction_mutex.owned? CONTRACT.validate!( { consumer: consumer, message: , offset_metadata: }, Errors::TransactionalOffsetInvalidError ) details = { message: , offset_metadata: } transactional_instrument(:marked_as_consumed, details) do tpl = Rdkafka::Consumer::TopicPartitionList.new partition = Rdkafka::Consumer::Partition.new( .partition, # +1 because this is next offset from which we will start processing from .offset + 1, 0, ) tpl.add_topic_and_partitions_with_offsets(.topic, [partition]) with_transactional_error_handling(:store_offset) do client.send_offsets_to_transaction( consumer, tpl, @config.max_wait_timeout ) end end end |
#transactional? ⇒ Boolean
Returns Is this producer a transactional one.
98 99 100 101 102 |
# File 'lib/waterdrop/producer/transactions.rb', line 98 def transactional? return @transactional if instance_variable_defined?(:'@transactional') @transactional = config.kafka.to_h.key?(:'transactional.id') end |