Deserialization
Karafka provides extensive support for custom deserialization processes that accommodate various data representations in your Kafka messages. This guide outlines how Karafka facilitates payload deserialization and extends to deserializing message keys and headers, offering broad flexibility in handling Kafka data. Additionally, it introduces the concept of lazy deserialization, which optimizes performance by delaying the deserialization process until the data is actually needed.
Deserializers for Payload, Key, and Headers
Deserializers transform the raw data of Kafka messages (payload, key, and headers) into a format your application can process. You can configure default deserializers or specify custom deserializers for each topic within your routes configuration.
Each type of deserializer in Karafka accepts different parameters within the #call
method and is expected to return a specific type of deserialized data. Below is a brief description of what each deserializer receives and is expected to return:
Deserializer Type | Receives | Expects Return | Input Data Method |
---|---|---|---|
Payload Deserializer | Karafka::Messages::Message |
Deserialized Payload | #raw_payload |
Key Deserializer | Karafka::Messages::Metadata |
Deserialized Key | #raw_key |
Headers Deserializer | Karafka::Messages::Metadata |
Deserialized Headers Hash | #raw_headers |
Below you can find an example of a payload XML deserializer:
class XmlDeserializer
def call(message)
# nil case is for tombstone messages
return nil if message.raw_payload.nil?
Hash.from_xml(message.raw_payload)
end
end
Default Deserializers
Karafka makes specific assumptions about incoming data format, setting defaults for how payloads, keys, and headers are handled unless explicitly overridden by custom deserializers. Here are the default behaviors:
-
Payload: Karafka assumes the message payload is in JSON format. This default deserializer automatically parses the raw payload from JSON into a Ruby hash, catering to common data interchange practices and supporting the tombstone event format.
-
Key: By default, the key remains as a raw string. This approach is practical for most applications where the key is used primarily as an identifier or a partitioning token within Kafka.
-
Headers: Headers are also kept in their original format by default. Karafka treats headers as a hash with string keys and string values, which is typical for transmitting metadata associated with the message.
Configuring Deserializers
Setting Defaults
In Karafka, you can configure default deserializers for all topics by utilizing the #defaults
block within your routing configuration. This is particularly useful if your application generally handles messages in a specific format and you wish to apply a consistent deserialization approach across multiple topics.
class KarafkaApp < Karafka::App
setup do |config|
# ...
end
defaults do
deserializers(
payload: JsonDeserializer.new,
key: StringDeserializer.new,
headers: HashDeserializer.new
)
end
end
Suppose a specific deserializer is not set for a given element (payload, key, or headers). In that case, Karafka will revert to using the predefined defaults: JSON for payloads, raw strings for keys, and unchanged hashes for headers.
Custom Per-Topic Deserializers
While setting default deserializers is a reliable way to maintain consistency across an application, Karafka's true power lies in its flexibility. It allows for detailed customization by configuring deserializers for individual topics, a feature that becomes invaluable when dealing with topics that require specific data handling procedures or when integrating with external systems that use varied data formats.
To set deserializers for a specific topic, you use the deserializers method within the topic configuration block in your routing setup. This allows you to define unique deserialization logic for the payload, key, and headers of messages consumed from that topic.
Here's an example of how to configure deserializes for individual topics:
class KarafkaApp < Karafka::App
setup do |config|
# ...
end
routes.draw do
topic :financial_transactions do
consumer TransactionsConsumer
deserializers(
payload: AvroDeserializer.new,
key: IntegerDeserializer.new,
headers: JsonDeserializer.new
)
end
topic :system_logs do
consumer LogsConsumer
deserializers(
payload: TextDeserializer.new
# Uses the default framework deserializers
# for headers and key
)
end
end
end
Per-Topic Deserializer Overrides
When you configure deserializers for a specific topic using the #deserializers
method and do not include deserializers for all components (payload, key, headers), be aware that Karafka treats the #deserializers
block as atomic. This means that for any component not explicitly defined within the topic's #deserializers
block, Karafka will revert to using the framework's built-in defaults, not the overrides specified in the #defaults
block.
Context Aware Deserialization
In more complex messaging environments, a message's content and format can vary significantly based on its context, such as the topic or specific headers associated with it. Karafka supports context-aware deserialization, where the deserialization logic can adjust dynamically based on additional information from the message itself, enhancing flexibility and robustness in message processing.
Deserializers in Karafka can access the full context of a message, including its headers and the topic it belongs to. This capability allows for dynamic adjustments in the deserialization process based on this context, such as selecting a specific schema or method for decoding the message data. An everyday use case for this is with formats like Avro, where a message header may indicate the schema needed to decode a message.
class AvroDeserializer
attr_reader :avro
def initialize(avro: avro)
@avro = avro
end
def call(message)
avro.decode(message.raw_payload, subject: message.headers['message_type'])
end
end
class KarafkaApp < Karafka::App
setup do |config|
# ...
end
routes.draw do
topic :binary_video_details do
consumer Videos::DetailsConsumer
deserializers(
payload: AvroDeserializer.new(avro: AvroTurf.new)
)
end
end
end
Lazy Deserialization
In Karafka, lazy deserialization extends beyond the payload to include both the message key and headers. This feature enhances performance by delaying the conversion of raw message data into a usable format until it is explicitly required. This approach is especially beneficial for operations such as metadata-based filtering or when dealing with large datasets where minimizing processing overhead is crucial.
Karafka defers the deserialization for the payload, key, and headers. Here's how each component is handled:
-
Payload: Deserialization occurs when the
Karafka::Messages::Message#payload
method is invoked for the first time. The deserialized data is cached, so subsequent accesses do not trigger re-deserialization. -
Key: Similar to payloads, keys are deserialized on the first invocation of the
Karafka::Messages::Message#key
method, with the result cached for future accesses. -
Headers: Headers are deserialized upon the first call to
Karafka::Messages::Message#headers
. The deserialized headers are stored to prevent redundant processing.
Access to the raw data for each component is also provided without triggering deserialization:
Karafka::Messages::Message#raw_payload
for payloads,Karafka::Messages::Message#raw_key
for keys,Karafka::Messages::Message#raw_headers
for headers.
Here's an expanded example that illustrates the use of lazy deserialization across all components of a message. This example filters messages based on the content of the raw payload and headers, then processes and prints data from the deserialized payload and key:
class EventsConsumer < ApplicationConsumer
def consume
messages
# Limit data amount using raw payload string based scanning
.select { _1.raw_payload.include?('signature') }
# Deserialize
.map(&:payload)
# Look for data with particular key
.select { _1.keys.include?('signature') }
# extract what you were looking for
.map { _1.fetch('signature') }
# Print only those
.each { puts _1 }
end
end
Lazy deserialization provides following benefits:
-
Performance Optimization: By avoiding unnecessary deserialization, Karafka minimizes CPU usage and speeds up processing times, especially in filter-heavy applications.
-
Resource Efficiency: Memory usage is optimized as only necessary data is processed and stored after deserialization.
-
Flexibility and Control: Developers have more control over when and how data is processed, allowing for customized handling based on the content of the messages.
Lazy deserialization in Karafka provides a robust mechanism for managing data processing in a Kafka-based messaging environment. It ensures that applications remain efficient and responsive even as data volume and complexity grow.
Handling of Tombstone Messages
In Apache Kafka, tombstone messages are specific messages with the message key present, but the payload is null. These messages serve a critical role in Kafka's log compaction feature, which reduces the size of the log by removing outdated records. A tombstone message indicates that any previous messages with the same key should be considered deleted or obsolete, allowing Kafka to maintain only the most current data state for each key.
Even if your current application logic does not specifically handle or generate tombstone messages, it is important to design your systems to accommodate them. This ensures that your application can correctly interpret and react to data streams that might include tombstone messages, particularly when integrating with other systems or when changes to data handling policies are implemented.
When creating custom deserializers, you should explicitly manage the possibility of encountering tombstone messages. The deserializer should be able to gracefully handle nil
payloads without causing errors or unintended behavior in the application. Here is how you can implement this in a custom XML deserializer:
class XmlDeserializer
def call(message)
# Check for tombstone messages where the payload is nil
return nil if message.raw_payload.nil?
# Proceed with deserialization if the payload is not nil
Hash.from_xml(message.raw_payload)
end
end
By properly managing tombstone messages in your Kafka consumers, you can ensure that your application remains stable and consistent, even when dealing with evolving data states facilitated by Kafka’s log compaction feature.
Dynamic Deserialization Based on Topic or Message Metadata
In scenarios where messages originate from various topics and no explicit consumers are set up, you may want to dynamically resolve the deserializer based on some condition, like the topic name or metadata.
Instead of explicitly defining a deserializer for every topic, Karafka allows you to configure a "smarter" default deserializer that adapts dynamically.
You can implement a custom deserializer that evaluates each message and applies the appropriate deserialization strategy based on the message's topic or other metadata (e.g., headers). Karafka will then use this dynamic deserializer when using the Admin API, Iterator API, and the Web UI Explorer.
Here's an example of a dynamic deserializer setup:
# Dynamic deserializer that decides what format to use
# based on the topic name
class DynamicDeserializer
def call(message)
case message.topic
when 'producers'
JSON.parse(message.raw_payload)
else
Xml.parse(message.raw_payload)
end
end
end
class KarafkaApp < Karafka::App
setup do |config|
# ...
end
routes.draw do
# Set a dynamic resolver for all the payloads of all the topics even
# when they are not defined in the routing explicitly
defaults do
deserializers(
payload: DynamicDeserializer.new
)
end
topic :example do
consumer ExampleConsumer
# Since dynamic defaults are used, for topics that require explicit
# deserializer, you need to set it yourself
deserializers(
payload: JsonDeserializer.new
)
end
end
end
This approach simplifies the management of topics and makes the deserialization process more flexible without requiring configuration for each topic that will not be consumed.
Apache Avro
Apache Avro is a data serialization system developed by the Apache Foundation, used widely in the Big Data and cloud computing field. It provides a compact, fast, binary data format with rich data structures. Its schema evolution capability allows for flexibility in data reading and writing, with old software being able to read new data and vice versa. This language-agnostic system aids efficient data interchange between programs and supports a seamless and efficient way of data storage, encoding, and decoding.
From the perspective of the Karafka framework, Avro is a serialization and deserialization layer, and there are no special things that are required to use it aside from making sure that you both serialize and deserialize your data using it.
In other words, Avro is used to convert complex data structures into a format that can be easily transported over the network or stored, and later be converted back into the original data structure. This is crucial when working with Kafka, as data often needs to be sent between different services or stored for later use.
The Ruby ecosystem offers an excellent gem for working with Avro called avro_turf
. This gem provides a simple and effective way to encode and decode data using Avro, and it's compatible with both local filesystem schemas and schema registries.
Let's explain these two concepts:
-
Local Filesystem Schemas: When using Avro, you define how your data should be structured using schemas. These schemas are often stored as
.avsc
files on your local filesystem.avro_turf
can read these schema files and use them to encode your data into Avro's binary format or decode binary data back into its original form. -
Schema Registry: A Schema Registry is a server that stores Avro schemas in a central location. It is handy in a microservice architecture where many services may need to share and access common schemas. Keeping your Avro schemas in a Schema Registry allows different services to look up the schema associated with a particular version of data, ensuring that data can be correctly decoded even as your schemas evolve.
Serialization using Avro
Local Filesystem Schemas
To serialize data with Avro and avro_turf
for use with Karafka, you'll first need to define an Avro schema for the data you want to send. Once you have the schema, you need to create an Avro reference object, point it to your schema and ensure that during deserialization, Karafka knows which schema to use:
avro = AvroTurf.new(schemas_path: 'app/schemas/')
message = avro.encode(
{ 'full_name' => 'Jane Doe', 'age' => 28 },
schema_name: 'person',
validate: true
)
Karafka.producer.produce_async(
topic: 'people',
payload: message,
# indicate type of schema in the message headers
headers: { message_type: 'person' }
)
Schema Registry
In case of a schema registry, you also need to connect to it and select the expected subject of serialization:
avro = AvroTurf::Messaging.new(registry_url: 'http://0.0.0.0:8081')
message = avro.encode(
{ 'title' => 'hello, world' },
subject: 'greeting',
version: 1
)
Karafka.producer.produce_async(
topic: 'greeting',
payload: message
)
When working with a schema registry, there is no need for a message_type
definition, as it will be auto-detected.
Abstracting Avro serialization
If you frequently use Avro for serialization and deserialization with Karafka, creating a wrapper around the Karafka producer can be beneficial. This wrapper can handle the Avro serialization before producing messages to Kafka, reducing redundancy and making your code more concise and manageable:
class AvroProducer
# Point this to your schemas location
AVRO = AvroTurf.new(schemas_path: 'app/schemas/')
class << self
def produce_async(topic, data, schema)
message = AVRO.encode(
data,
schema_name: schema_name,
validate: true
)
Karafka.producer.produce_async(
topic: topic,
payload: message,
headers: { message_type: schema }
)
end
end
end
AvroProducer.produce_async(
'people',
{ 'full_name' => 'Jane Doe', 'age' => 28 },
'person'
)
Deserialization using Avro
When receiving Avro-encoded messages in a Karafka consumer, you'll need to deserialize these messages back into a usable form. One efficient way to handle this is by creating a custom Karafka deserializer.
Local Filesystem Schemas
Create your Avro deserializer:
class AvroLocalDeserializer
AVRO = AvroTurf.new(schemas_path: 'app/schemas/')
def call(message)
AVRO.decode(
message.raw_payload,
schema_name: message.headers['message_type']
)
end
end
And indicate that a given topic contains Avro data in your routing setup:
topic :person do
consumer PersonConsumer
deserializers(
payload: AvroLocalDeserializer.new
)
end
Schema Registry
Create your Avro deserializer:
class AvroRegistryDeserializer
# Note, that in a production system you may want to pass authorized Avro reference
AVRO = AvroTurf::Messaging.new(registry_url: 'http://0.0.0.0:8081')
def call(message)
AVRO.decode(
message.raw_payload
)
end
end
And indicate that a given topic contains Avro data in your routing setup:
topic :person do
consumer PersonConsumer
deserializers(
payload: AvroRegistryDeserializer.new
)
end