Skip to content

Routing

The routing engine provides an interface to describe how messages from all the topics should be received and consumed.

Due to the dynamic nature of Kafka, you can use multiple configuration options; however, only a few are required.

Routing DSL Organization

Karafka uses consumer groups to subscribe to topics. Each consumer group needs to be subscribed to at least one topic (but you can subscribe with it too as many topics as you want). To replicate this concept in our routing DSL, Karafka allows you to configure settings on two levels:

  • settings level - root settings that will be used everywhere
  • topic level - options that need to be set on a per topic level or overrides to options set on a root level

Most of the settings (apart from the consumer) are optional and if not configured, will use defaults provided during the configuration of the app itself.

Karafka provides two ways of defining topics on which you want to listen:

Single Consumer Group with Multiple Topics Mode

In this mode, Karafka will create a single consumer group to which all the topics will belong.

It is recommended for most of the use-cases and can be changed later.

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    topic :example do
      consumer ExampleConsumer
    end

    topic :example2 do
      consumer Example2Consumer
    end
  end
end

Multiple Consumer Groups Mode

In this mode, Karafka will use a single consumer group per each of the topics defined within a single #consumer_group block.

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    consumer_group :group_name do
      topic :example do
        consumer ExampleConsumer
      end

      topic :example2 do
        consumer ExampleConsumer2
      end
    end

    consumer_group :group_name2 do
      topic :example3 do
        consumer Example2Consumer3
      end
    end
  end
end

Multiple Subscription Groups Mode

Karafka uses a concept called subscription groups to organize topics into groups that can be subscribed to Kafka together. This aims to preserve resources to achieve as few connections to Kafka as possible.

Each subscription group connection operates independently in a separate background thread. They do, however, share the workers pool for processing.

All the subscription groups define within a single consumer group will operate within the same consumer group.

Pro Subscription Group Multiplexing

Karafka's Open Source version supports one connection per subscription group without ability to subscribe to the same topic multiple times from the same process. If you want to establish many connections to the same topic from a single process, upgrade to Karafka Pro. Multiplexing allows multiple connections to the same topic within a single subscription group, enhancing performance and parallel processing.

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    subscription_group 'a' do
      topic :A do
        consumer ConsumerA
      end

      topic :B do
        consumer ConsumerB
      end

      topic :D do
        consumer ConsumerD
      end
    end

    subscription_group 'b' do
      topic :C do
        consumer ConsumerC
      end
    end
  end
end

You can read more about the concurrency implications of using subscription groups here.

Subscription Group Multiplexing

For those using the advanced options in Karafka Pro, we have a special page dedicated to the Multiplexing feature. Multiplexing allows you to establish multiple independent connections to Kafka to subscribe to one topic from a single process. This detailed resource covers everything you need to know about how Multiplexing works, how to set it up, and tips for using it effectively. To learn all about this feature and make the most of it, please check out the Multiplexing documentation.

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    # Always establish two independent connections to this topic from every
    # single process. They will be able to poll and process data independently
    subscription_group 'a', multiplex: 2 do
      topic :A do
        consumer ConsumerA
      end
    end
  end
end

Routing Patterns

For users leveraging the advanced capabilities of Karafka Pro, the Routing Patterns feature has its dedicated documentation page. This page delves deep into the behavior, configuration, and best practices surrounding Routing Patterns. Please refer to the Routing Patterns documentation to explore this feature in detail and gain comprehensive insights.

Overriding Defaults

Almost all the default settings configured can be changed on either on the topic level. This means that you can provide each topic with some details in case you need a non-standard way of doing things (for example, you need batch consuming only for a single topic).

Shared Defaults

This option allows you to define default settings that apply to all the topics defined in your routing unless those are defined explicitely when describing the appropriate topic. This not only simplifies configurations but also ensures consistency throughout your application.

Here's how you can set up routing defaults and then define a topic that overrides one of those defaults:

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    defaults do
      config(
        # Ensure there are always 5 partitions by default
        partitions: 5,
        # Make sure that topic is replicated in production
        replication_factor: Rails.env.production? ? 2 : 1
      )
    end

    topic :A do
      consumer ConsumerA
      # When overwriting defaults, all settings need to be
      # redefined for a given method. Partial redefinition
      # is not allowed and will not work
      config(
        partitions: 2,
        replication_factor: Rails.env.production? ? 2 : 1
      )
    end

    topic :B do
      consumer ConsumerB
    end
  end
end

When you decide to override any default option for a topic within the #topic block, it's crucial to understand that you must set all the arguments for that particular option. Partial updating of arguments is not supported.

Karafka will not use the user-specified defaults you've defined in the defaults block if you attempt to update the arguments for an option partially. Instead, it will revert to the framework's internal defaults for the missing arguments. This could lead to unexpected behavior in your application if not considered.

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    defaults do
      config(
        # Ensure there are always 5 partitions by default
        partitions: 5,
        # Make sure that topic is replicated in production
        replication_factor: Rails.env.production? ? 2 : 1
      )
    end

    topic :A do
      consumer ConsumerA

      # BAD idea because `replication_factor` is going to be set
      # to `1` as it is the framework default
      config(partitions: 2)
    end
  end
end

Topic Level Options

There are several options you can set inside of the topic block. All of them except consumer are optional. Here are the most important once:

Option Value type Description
active Boolean Set to false if you want to have the given topic defined but not consumed. Helpful when working with topics via admin API
consumer Class Name of a consumer class that we want to use to consume messages from a given topic
deserializers Hash Names of a deserializers that we want to use to deserializes the incoming data (payload, key and headers)
manual_offset_management Boolean Should Karafka automatically mark messages as consumed or not
long_running_job Boolean Converts this topic consumer into a job that can run longer than max.poll.interval.ms
virtual_partitions Hash Allows you to parallelize the processing of data from a single partition.
dead_letter_queue Hash Provides a systematic way of dealing with persistent consumption errors.
delay_by Integer Feature that enables delaying message processing from specific topics for a specified time.
expire_in Integer Feature that allows messages to be excluded from processing automatically in case they are too old.
filter #call Feature that allows users to filter messages based on specific criteria.
config Hash Allows for specifying each of the topic settings and their creation via the CLI commands
kafka Hash Allows you to configure alternative cluster on a per-topic basis for a multi-cluster setup
class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    consumer_group :videos_consumer do
      topic :binary_video_details do
        config(partitions: 2)
        consumer Videos::DetailsConsumer
        deserializers(
          payload: Serialization::Binary::Deserializer.new
        )
      end

      topic :new_videos do
        config(partitions: 5, replication_factor: 4)
        consumer Videos::NewVideosConsumer
      end
    end

    topic :events do
      config(partitions: 1, 'cleanup.policy': 'compact')
      # Set to false because not for consumption.
      # Only inspection via admin API.
      active false
      deserializers(
        payload: EventsDeserializer.new
      )
    end
  end
end

Kafka Scope Configuration Reuse

Karafka uses the inherit flag to support partial Kafka routing reconfiguration at the topic level. This allows you to maintain a consistent base configuration while applying specific alterations to individual topics. When the inherit flag is true, the topic's Kafka settings will merge with the root-level defaults, enabling more granular and flexible configurations without redefining all settings.

This feature is handy in scenarios where most settings remain consistent across topics, but a few need to be customized. By leveraging the inherit option, you can streamline your configurations, reduce redundancy, and ensure that only the necessary changes are applied on a per-topic basis.

class KarafkaApp < Karafka::App
  setup do |config|
    config.kafka = {
      'bootstrap.servers': '127.0.0.1:9092',
      'auto.offset.reset': 'earliest'
    }
  end

  routes.draw do
    # Topic that inherits base kafka settings and adds a specific one
    topic :example1 do
      consumer ExampleConsumer1
      kafka(
        'enable.partition.eof': true,
        inherit: true
      )
    end

    # Topic with its own kafka settings without inheritance
    topic :example2 do
      consumer ExampleConsumer2
      kafka(
        'bootstrap.servers': '127.0.0.1:9092',
        'enable.partition.eof': true
      )
    end

    # Another topic that inherits base kafka settings and adds a specific one
    topic :example3 do
      consumer ExampleConsumer3
      kafka(
        'fetch.message.max.bytes': 10_000_000,
        inherit: true
      )
    end
  end
end

Kafka Scope Config Reuse Without inherit

When using Kafka scope configuration at the topic level, be aware that without the inherit setting, there is no automatic inheritance of Kafka settings from the root-level configuration. This means you must duplicate all Kafka scope configurations for each topic. Failure to do so can result in default settings being applied instead of the intended configuration, which might lead to unexpected behavior or inconsistent setup across your topics.

Modular Monolith Approach

A Modular Monolith architecture focuses on separating a monolith application into well-defined, loosely coupled modules. These modules can evolve and scale independently but still operate as part of a single unit. With Karafka, embracing this architecture becomes efficient due to its flexible routing mechanism.

One of Karafka's routing beauties is the ability to call the #draw method multiple times. In a Modular Monolith architecture context, each of your application's modules can define its own set of topic routes.

  • Decoupling: Each module can define and manage its message routing without interfering with others.

  • Scalability: As modules grow, they can independently evolve their messaging strategies.

  • Maintainability: Changes to routing in one module won't impact others, making it easier to manage and refactor.

Within each module, you can define a Karafka routing block using the #draw method:

# app/modules/orders/karafka_routes.rb
Karafka.routing.draw do
  topic :order_created do
    consumer Orders::OrderCreatedConsumer
  end

  topic :order_updated do
    consumer Orders::OrderUpdatedConsumer
  end
end
# app/modules/users/karafka_routes.rb
Karafka.routing.draw do
  topic :user_registered do
    consumer Users::UserRegisteredConsumer
  end
end

By leveraging the ability to draw routes multiple times, Karafka seamlessly fits into a Modular Monolith architecture. This allows for improved code organization, easier maintenance, and the flexibility to evolve each module independently.