Module: WaterDrop::Producer::Transactions

Included in:
WaterDrop::Producer
Defined in:
lib/waterdrop/producer/transactions.rb

Overview

Transactions related producer functionalities

Instance Method Summary collapse

Instance Method Details

#transactionObject

Creates a transaction.

Karafka transactions work in a similar manner to SQL db transactions though there are some crucial differences. When you start a transaction, all messages produced during it will be delivered together or will fail together. The difference is, that messages from within a single transaction can be delivered and will have a delivery handle but will be then compacted prior to moving the LSO forward. This means, that not every delivery handle for async dispatches will emit a queue purge error. None for sync as the delivery has happened but they will never be visible by the transactional consumers.

Transactions are thread-safe however they lock a mutex. This means, that for high-throughput transactional messages production in multiple threads (for example in Karafka), it may be much better to use few instances that can work in parallel.

Please note, that if a producer is configured as transactional, it cannot produce messages outside of transactions, that is why by default all dispatches will be wrapped with a transaction. One transaction per single dispatch and for produce_many it will be a single transaction wrapping all messages dispatches (not one per message).

Examples:

Simple transaction

producer.transaction do
  producer.produce_async(topic: 'topic', payload: 'data')
end

Aborted transaction - messages producer won’t be visible by consumers

producer.transaction do
  producer.produce_sync(topic: 'topic', payload: 'data')
  raise WaterDrop::AbortTransaction
end

Use block result last handler to wait on all messages ack

handler = producer.transaction do
            producer.produce_async(topic: 'topic', payload: 'data')
          end

handler.wait

Returns:

  • Block result or nil in case of early break/return



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/waterdrop/producer/transactions.rb', line 57

def transaction
  # This will safely allow us to support one operation transactions so a transactional
  # producer can work without the transactional block if needed
  return yield if @transaction_mutex.owned?

  @transaction_mutex.synchronize do
    transactional_instrument(:finished) do
      with_transactional_error_handling(:begin) do
        transactional_instrument(:started) { client.begin_transaction }
      end

      result = nil
      finished = false

      begin
        result = yield
        finished = true
      rescue Exception => e
        raise(e)
      ensure
        if !e && !finished
          raise(
            Errors::EarlyTransactionExitNotAllowedError,
            <<~ERROR_MSG.tr("\n", ' ')
              Using `return`, `break` or `throw` to exit a transaction block is not allowed.
              If the `throw` came from `Timeout.timeout(duration)`, pass an exception class as
              a second argument so it doesn't use `throw` to abort its block.
            ERROR_MSG
          )
        end
      end

      with_transactional_error_handling(:commit) do
        transactional_instrument(:committed) { client.commit_transaction }
      end

      result
    # We need to handle any interrupt including critical in order not to have the transaction
    # running. This will also handle things like `IRB::Abort`
    rescue Exception => e
      # This code is a bit tricky. We have an error and when it happens we try to rollback
      # the transaction. However we may end up in a state where transaction aborting itself
      # produces error. In such case we also want to handle it as fatal and reload client.
      # This is why we catch this here
      begin
        with_transactional_error_handling(:abort) do
          transactional_instrument(:aborted) do
            client.abort_transaction
          end
        end
      rescue StandardError => e
        # If something from rdkafka leaks here, it means there was a non-retryable error that
        # bubbled up. In such cases if we should, we do reload the underling client
        transactional_reload_client_if_needed(e)

        raise
      end

      transactional_reload_client_if_needed(e)

      raise unless e.is_a?(WaterDrop::Errors::AbortTransaction)
    end
  end
end

#transaction?Boolean

Returns true if we are in an active transaction.

Returns:

  • (Boolean)

    true if we are in an active transaction



123
124
125
# File 'lib/waterdrop/producer/transactions.rb', line 123

def transaction?
  @transaction_mutex.owned?
end

#transaction_mark_as_consumed(consumer, message, offset_metadata = nil) ⇒ Object

Marks given message as consumed inside of a transaction.

Parameters:

  • consumer (#consumer_group_metadata_pointer)

    any consumer from which we can obtain the librdkafka consumer group metadata pointer

  • message (Karafka::Messages::Message)

    karafka message

  • offset_metadata (String) (defaults to: nil)

    offset metadata or nil if none

Raises:



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/waterdrop/producer/transactions.rb', line 140

def transaction_mark_as_consumed(consumer, message,  = nil)
  raise Errors::TransactionRequiredError unless @transaction_mutex.owned?

  CONTRACT.validate!(
    {
      consumer: consumer,
      message: message,
      offset_metadata: 
    },
    Errors::TransactionalOffsetInvalidError
  )

  details = { message: message, offset_metadata:  }

  transactional_instrument(:marked_as_consumed, details) do
    tpl = Rdkafka::Consumer::TopicPartitionList.new
    partition = Rdkafka::Consumer::Partition.new(
      message.partition,
      # +1 because this is next offset from which we will start processing from
      message.offset + 1,
      0,
      
    )

    tpl.add_topic_and_partitions_with_offsets(message.topic, [partition])

    with_transactional_error_handling(:store_offset) do
      client.send_offsets_to_transaction(
        consumer,
        tpl,
        current_variant.max_wait_timeout
      )
    end
  end
end

#transactional?Boolean

Returns Is this producer a transactional one.

Returns:

  • (Boolean)

    Is this producer a transactional one



128
129
130
131
132
# File 'lib/waterdrop/producer/transactions.rb', line 128

def transactional?
  return @transactional if instance_variable_defined?(:'@transactional')

  @transactional = config.kafka.to_h.key?(:'transactional.id')
end