Module: WaterDrop::Producer::Testing

Defined in:
lib/waterdrop/producer/testing.rb

Overview

Note:

This module should only be used in test environments.

Note:

Requires karafka-rdkafka >= 0.23.1 which includes Rdkafka::Testing support.

Note:

This module is not auto-loaded by Zeitwerk and must be manually required.

Testing utilities for WaterDrop Producer instances.

This module provides methods for triggering and querying fatal errors on producers, which is useful for testing error handling and recovery logic (such as automatic producer reloading on fatal errors).

Examples:

Including for a single producer instance

require 'waterdrop/producer/testing'

producer = WaterDrop::Producer.new
producer.singleton_class.include(WaterDrop::Producer::Testing)
producer.trigger_test_fatal_error(47, "Test producer fencing")

Including for all producers in a test suite

# In spec_helper.rb or test setup:
require 'waterdrop/producer/testing'

WaterDrop::Producer.include(WaterDrop::Producer::Testing)

Testing idempotent producer reload on fatal error

producer = WaterDrop::Producer.new do |config|
  config.kafka = { 'bootstrap.servers': 'localhost:9092' }
  config.reload_on_idempotent_fatal_error = true
end
producer.singleton_class.include(WaterDrop::Producer::Testing)

# Trigger a fatal error that should cause reload
producer.trigger_test_fatal_error(47, "Invalid producer epoch")

# Produce should succeed after automatic reload
producer.produce_sync(topic: 'test', payload: 'message')

# Fatal error should be cleared after reload
expect(producer.fatal_error).to be_nil

Instance Method Summary collapse

Instance Method Details

#fatal_errorHash?

Checks if a fatal error has occurred on the underlying rdkafka producer.

This method queries librdkafka’s fatal error state to retrieve information about any fatal error that has occurred. Fatal errors are serious errors that prevent the producer from continuing normal operation.

Examples:

Check for fatal error

if error = producer.fatal_error
  puts "Fatal error #{error[:error_code]}: #{error[:error_string]}"
else
  puts "No fatal error present"
end

Verify fatal error after triggering

producer.trigger_test_fatal_error(47, "Test error")
error = producer.fatal_error
expect(error[:error_code]).to eq(47)

Returns:

  • (Hash, nil)

    A hash containing error details if a fatal error occurred, or nil if no fatal error is present. The hash contains: - :error_code [Integer] The librdkafka error code - :error_string [String] Human-readable error description



93
94
95
96
# File 'lib/waterdrop/producer/testing.rb', line 93

def fatal_error
  ensure_testing_support!
  client.fatal_error
end

#trigger_test_fatal_error(error_code, reason) ⇒ Integer

Triggers a test fatal error on the underlying rdkafka producer.

This method uses librdkafka’s test error injection functionality to simulate fatal errors without requiring actual error conditions. This is particularly useful for testing WaterDrop’s fatal error handling and automatic reload logic.

Examples:

Trigger producer fencing error

producer.trigger_test_fatal_error(47, "Test producer fencing scenario")

Trigger invalid producer ID error

producer.trigger_test_fatal_error(64, "Test invalid producer ID mapping")

Parameters:

  • error_code (Integer)

    The librdkafka error code to trigger. Common error codes for testing: - 47 (RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH) - Producer fencing - 64 (RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING) - Invalid producer ID

  • reason (String)

    A descriptive reason for the error, used for debugging and logging purposes

Returns:

  • (Integer)

    Result code from rd_kafka_test_fatal_error (0 on success)

Raises:

  • (RuntimeError)

    If the underlying rdkafka client doesn’t support testing



66
67
68
69
# File 'lib/waterdrop/producer/testing.rb', line 66

def trigger_test_fatal_error(error_code, reason)
  ensure_testing_support!
  client.trigger_test_fatal_error(error_code, reason)
end