Skip to content

Deserialization

Karafka, by default, assumes you are receiving and sending JSON information.

This means that if you receive, for example, an XML payload, deserialization will fail.

Deserializers are used to convert raw Kafka messages into a workable format. They are used when working with incoming messages and accept the full message object as an argument.

You can set a default deserializer that will be used for all the topics, or you can specify a deserializer per topic in the routing.

class XmlDeserializer
  def call(message)
    # nil case is for tombstone messages
    return nil if message.raw_payload.nil?

    Hash.from_xml(message.raw_payload)
  end
end

App.routes.draw do
  topic :binary_video_details do
    consumer Videos::DetailsConsumer
    deserializer XmlDeserializer.new
  end
end

The default deserializer is Karafka::Serialization::Json::Deserializer.

Also, deserializes can use headers and topic values from the message to decode specific formats, for example, Avro:

class AvroDeserializer
  attr_reader :avro

  def initialize(avro: avro)
    @avro = avro
  end

  def call(message)
    avro.decode(message.raw_payload, subject: message.headers['message_type'])
  end
end

App.routes.draw do
  topic :binary_video_details do
    consumer Videos::DetailsConsumer
    deserializer AvroDeserializer.new(avro: AvroTurf.new)
  end
end

Lazy deserialization

The payload will not be deserialized unless needed. This makes things like metadata-based filtering or raw string-based filtering extremely fast because no data parsing is involved.

Whenever you invoke the Karafka::Messages::Message#payload method, the deserialization will happen, and the result will be stored. This means that consecutive #payload invocation on the same message won't deserialize it repeatedly.

If you want to access raw payload data, you can use the Karafka::Messages::Message#raw_payload method.

Below you can find an example of the elevation of this feature. Any time you expect occurrences of certain characters in a JSON structure, you can quickly pre-filter raw data and only deserialize those that potentially may include what you are looking for.

class EventsConsumer < ApplicationConsumer
  def consume
    messages
      # Limit data amount using raw payload string based scanning
      .select { _1.raw_payload.include?('signature') }
      # Deserialize
      .map(&:payload)
      # Look for data with particular key
      .select { _1.keys.include?('signature') }
      # extract what you were looking for
      .map { _1.fetch('signature') }
      # Print only those
      .each { puts _1 }
  end
end

Handling of tombstone messages

Tombstone messages are messages that contain a valid key, but its value is null. Those messages are used with Kafka topics compaction to eliminate obsolete records.

Whether or not you do plan to use tombstone messages, we highly recommend you supporting them in your custom deserializers:

class XmlDeserializer
  def call(message)
    # nil case is for tombstone messages
    return nil if message.raw_payload.nil?

    Hash.from_xml(message.raw_payload)
  end
end