Module: WaterDrop::Producer::Idempotence

Included in:
WaterDrop::Producer
Defined in:
lib/waterdrop/producer/idempotence.rb

Overview

Component for idempotent producer operations and error recovery

Instance Method Summary collapse

Instance Method Details

#idempotent?Boolean

Returns true if current producer is idempotent.

Returns:

  • (Boolean)

    true if current producer is idempotent



8
9
10
11
12
13
14
# File 'lib/waterdrop/producer/idempotence.rb', line 8

def idempotent?
  # Every transactional producer is idempotent by default always
  return true if transactional?
  return @idempotent unless @idempotent.nil?

  @idempotent = config.kafka.to_h.fetch(:'enable.idempotence', false)
end

#idempotent_reloadable?(error) ⇒ Boolean

Note:

Returns true only if all of the following conditions are met: - Error is fatal - Producer is idempotent - Producer is not transactional - reload_on_idempotent_fatal_error config is enabled - Error is not in the non_reloadable_errors config list

Checks if the given error should trigger an idempotent producer reload

Parameters:

  • error (Rdkafka::RdkafkaError)

    the error to check

Returns:

  • (Boolean)

    true if the error should trigger a reload



27
28
29
30
31
32
33
34
35
# File 'lib/waterdrop/producer/idempotence.rb', line 27

def idempotent_reloadable?(error)
  return false unless error.fatal?
  return false unless idempotent?
  return false if transactional?
  return false unless config.reload_on_idempotent_fatal_error
  return false if config.non_reloadable_errors.include?(error.code)

  true
end

#idempotent_retryable?Boolean

Checks if we can still retry reloading after an idempotent fatal error

Returns:

  • (Boolean)

    true if we haven’t exceeded the max reload attempts yet



40
41
42
# File 'lib/waterdrop/producer/idempotence.rb', line 40

def idempotent_retryable?
  @idempotent_fatal_error_attempts < config.max_attempts_on_idempotent_fatal_error
end