WaterDrop Middleware
WaterDrop supports injecting middleware similar to Rack.
Middleware can be used to provide extra functionalities like auto-serialization of data or any other modifications of messages before their validation and dispatch.
Each middleware accepts the message hash as input and expects a message hash as a result.
There are two methods to register middlewares:
#prepend
- registers middleware as the first in the order of execution#append
- registers middleware as the last in the order of execution
Below you can find an example middleware that converts the incoming payload into a JSON string by running #to_json
automatically:
class AutoMapper
def call(message)
message[:payload] = message[:payload].to_json
message
end
end
# Register middleware
producer.middleware.append(AutoMapper.new)
# Dispatch without manual casting
producer.produce_async(topic: 'users', payload: user)
Message Mutability
It is up to the end user to decide whether to modify the provided message or deep copy it and update the newly created one.
Round-Robin Distribution Example Middleware
Below, you can find an example of a middleware that implements a round-robin message distribution across available partitions of the selected topic:
class Distributor
# Ruby `#cycle` is not thread-safe, this is why we use our own
class ThreadSafeCycle
def initialize(array)
@array = array
# Start in random location not to prefer partition 0
@index = rand(array.size)
@mutex = Mutex.new
end
def next
@mutex.synchronize do
value = @array[@index]
@index = (@index + 1) % @array.size
value
end
end
end
# We need the producer to fetch the number of partitions
# This will make the distributor dynamic, allowing for graceful support of repartitioning
# We also support the case of non-existing topics just by assigning partition 0.
#
# @param producer [WaterDrop::Producer]
# @param topics [Array]
def initialize(producer, topics)
@producer = producer
@topics = topics
@cycles = {}
@partition_counts = {}
@mutex = Mutex.new
end
# @param message [Hash]
def call(message)
topic = message.fetch(:topic)
# Do nothing unless it is one of the topics we want to round-robin
return message unless @topics.include?(topic)
build_iterator(topic)
# Assign partitions in the round-robin fashion to messages
message[:partition] = next_partition(topic)
# Return the morphed message
message
end
private
# Builds the Ruby iterator we can use to round-robin all the partitions unless it already
# exists and matches correctly number of partitions
#
# @param topic [String]
def build_iterator(topic)
partition_count = fetch_partition_count(topic)
# If we already have number of partitions cached, it means cycle is already prepared
# as well. If partitions count did not change, we can use it
return if partition_count == @partition_counts[topic]
@mutex.synchronize do
last_partition_id = partition_count - 1
@cycles[topic] = ThreadSafeCycle.new((0..last_partition_id).to_a)
@partition_counts[topic] = partition_count
end
end
# @param topic [String]
#
# @return [Integer] next partition to which dispatch the message
def next_partition(topic)
@mutex.synchronize do
@cycles.fetch(topic).next
end
end
# @param topic [String] topic for which we want to get number of partitions
#
# @return [Integer] number of partitions
#
# @note `#partition_count` fetched from rdkafka is cached. No need to cache it again
# @note Will return 1 partition for topics that do not exist
def fetch_partition_count(topic)
@producer.partition_count(topic)
rescue Rdkafka::RdkafkaError => e
# This error means topic does not exist, we then assume auto-create and will use 0 for now
return 1 if e.code == :unknown_topic_or_part
raise(e)
end
end
MY_PRODUCER.middleware.append(Distributor.new(MY_PRODUCER, %w[events]))
MY_PRODUCER.produce_async(
topic: 'events',
payload: event.to_json
)
Example Use Cases
-
Custom Partitioners: By implementing custom partitioning logic within middleware, you gain precise control over how messages are distributed across Kafka partitions. This approach is invaluable for scenarios demanding specific distribution patterns, such as grouping related messages in the same partition to maintain the sequence of events.
-
Automatic Data Serialization: Middleware can automatically transform complex data structures into a standardized format like JSON or Avro, streamlining the serialization process. This ensures consistent data formatting across messages and keeps serialization logic neatly encapsulated and separate from core business logic.
-
Automatic Headers Injection: Enriching messages with essential metadata becomes effortless with middleware. Automatically appending headers like message creation timestamps or producer identifiers ensures that each message carries all necessary context, facilitating tasks like tracing and auditing without manual intervention.
-
Message Validation: Middleware shines in its ability to ensure message integrity by validating each message against specific schemas or business rules before it reaches Kafka. This safeguard mechanism enhances data reliability by preventing invalid data from entering the stream and maintaining the high quality of the data within your topics.
-
Dynamic Routing Logic: Middleware allows for the dynamic routing of messages based on content or context, directing messages to the appropriate topics or partitions on the fly. This adaptability is crucial in complex systems where message destinations might change based on factors like payload content, workload distribution, or system state, ensuring that the data flow remains efficient and contextually relevant.