Routing
The routing engine provides an interface to describe how messages from all the topics should be received and consumed.
Due to the dynamic nature of Kafka, you can use multiple configuration options; however, only a few are required.
Routing DSL Organization
Karafka uses consumer groups to subscribe to topics. Each consumer group needs to be subscribed to at least one topic (but you can subscribe with it too as many topics as you want). To replicate this concept in our routing DSL, Karafka allows you to configure settings on two levels:
- settings level - root settings that will be used everywhere
- topic level - options that need to be set on a per topic level or overrides to options set on a root level
Most of the settings (apart from the consumer
) are optional and if not configured, will use defaults provided during the configuration of the app itself.
Karafka provides two ways of defining topics on which you want to listen:
Single Consumer Group with Multiple Topics Mode
In this mode, Karafka will create a single consumer group to which all the topics will belong.
It is recommended for most of the use-cases and can be changed later.
class KarafkaApp < Karafka::App
setup do |config|
# ...
end
routes.draw do
topic :example do
consumer ExampleConsumer
end
topic :example2 do
consumer Example2Consumer
end
end
end
Multiple Consumer Groups Mode
In this mode, Karafka will use a single consumer group per each of the topics defined within a single #consumer_group
block.
class KarafkaApp < Karafka::App
setup do |config|
# ...
end
routes.draw do
consumer_group :group_name do
topic :example do
consumer ExampleConsumer
end
topic :example2 do
consumer ExampleConsumer2
end
end
consumer_group :group_name2 do
topic :example3 do
consumer Example2Consumer3
end
end
end
end
Multiple Subscription Groups Mode
Karafka uses a concept called subscription groups
to organize topics into groups that can be subscribed to Kafka together. This aims to preserve resources to achieve as few connections to Kafka as possible.
Each subscription group connection operates independently in a separate background thread. They do, however, share the workers pool for processing.
All the subscription groups define within a single consumer group will operate within the same consumer group.
Pro Subscription Group Multiplexing
Karafka's Open Source version supports one connection per subscription group without ability to subscribe to the same topic multiple times from the same process. If you want to establish many connections to the same topic from a single process, upgrade to Karafka Pro. Multiplexing allows multiple connections to the same topic within a single subscription group, enhancing performance and parallel processing.
class KarafkaApp < Karafka::App
setup do |config|
# ...
end
routes.draw do
subscription_group 'a' do
topic :A do
consumer ConsumerA
end
topic :B do
consumer ConsumerB
end
topic :D do
consumer ConsumerD
end
end
subscription_group 'b' do
topic :C do
consumer ConsumerC
end
end
end
end
You can read more about the concurrency implications of using subscription groups here.
Subscription Group Multiplexing
For those using the advanced options in Karafka Pro, we have a special page dedicated to the Multiplexing feature. Multiplexing allows you to establish multiple independent connections to Kafka to subscribe to one topic from a single process. This detailed resource covers everything you need to know about how Multiplexing works, how to set it up, and tips for using it effectively. To learn all about this feature and make the most of it, please check out the Multiplexing documentation.
class KarafkaApp < Karafka::App
setup do |config|
# ...
end
routes.draw do
# Always establish two independent connections to this topic from every
# single process. They will be able to poll and process data independently
subscription_group 'a', multiplex: 2 do
topic :A do
consumer ConsumerA
end
end
end
end
Routing Patterns
For users leveraging the advanced capabilities of Karafka Pro, the Routing Patterns feature has its dedicated documentation page. This page delves deep into the behavior, configuration, and best practices surrounding Routing Patterns. Please refer to the Routing Patterns documentation to explore this feature in detail and gain comprehensive insights.
Overriding Defaults
Almost all the default settings configured can be changed on either on the topic
level. This means that you can provide each topic with some details in case you need a non-standard way of doing things (for example, you need batch consuming only for a single topic).
Shared Defaults
This option allows you to define default settings that apply to all the topics defined in your routing unless those are defined explicitely when describing the appropriate topic. This not only simplifies configurations but also ensures consistency throughout your application.
Here's how you can set up routing defaults and then define a topic that overrides one of those defaults:
class KarafkaApp < Karafka::App
setup do |config|
# ...
end
routes.draw do
defaults do
config(
# Ensure there are always 5 partitions by default
partitions: 5,
# Make sure that topic is replicated in production
replication_factor: Rails.env.production? ? 2 : 1
)
end
topic :A do
consumer ConsumerA
# When overwriting defaults, all settings need to be
# redefined for a given method. Partial redefinition
# is not allowed and will not work
config(
partitions: 2,
replication_factor: Rails.env.production? ? 2 : 1
)
end
topic :B do
consumer ConsumerB
end
end
end
When you decide to override any default option for a topic within the #topic
block, it's crucial to understand that you must set all the arguments for that particular option. Partial updating of arguments is not supported.
Karafka will not use the user-specified defaults you've defined in the defaults block if you attempt to update the arguments for an option partially. Instead, it will revert to the framework's internal defaults for the missing arguments. This could lead to unexpected behavior in your application if not considered.
class KarafkaApp < Karafka::App
setup do |config|
# ...
end
routes.draw do
defaults do
config(
# Ensure there are always 5 partitions by default
partitions: 5,
# Make sure that topic is replicated in production
replication_factor: Rails.env.production? ? 2 : 1
)
end
topic :A do
consumer ConsumerA
# BAD idea because `replication_factor` is going to be set
# to `1` as it is the framework default
config(partitions: 2)
end
end
end
Topic Level Options
There are several options you can set inside of the topic
block. All of them except consumer
are optional. Here are the most important once:
Option | Value type | Description |
---|---|---|
active | Boolean | Set to false if you want to have the given topic defined but not consumed. Helpful when working with topics via admin API |
consumer | Class | Name of a consumer class that we want to use to consume messages from a given topic |
deserializers | Hash | Names of a deserializers that we want to use to deserializes the incoming data (payload, key and headers) |
manual_offset_management | Boolean | Should Karafka automatically mark messages as consumed or not |
long_running_job | Boolean | Converts this topic consumer into a job that can run longer than max.poll.interval.ms |
virtual_partitions | Hash | Allows you to parallelize the processing of data from a single partition. |
dead_letter_queue | Hash | Provides a systematic way of dealing with persistent consumption errors. |
delay_by | Integer | Feature that enables delaying message processing from specific topics for a specified time. |
expire_in | Integer | Feature that allows messages to be excluded from processing automatically in case they are too old. |
filter | #call |
Feature that allows users to filter messages based on specific criteria. |
config | Hash | Allows for specifying each of the topic settings and their creation via the CLI commands |
kafka | Hash | Allows you to configure alternative cluster on a per-topic basis for a multi-cluster setup |
class KarafkaApp < Karafka::App
setup do |config|
# ...
end
routes.draw do
consumer_group :videos_consumer do
topic :binary_video_details do
config(partitions: 2)
consumer Videos::DetailsConsumer
deserializers(
payload: Serialization::Binary::Deserializer.new
)
end
topic :new_videos do
config(partitions: 5, replication_factor: 4)
consumer Videos::NewVideosConsumer
end
end
topic :events do
config(partitions: 1, 'cleanup.policy': 'compact')
# Set to false because not for consumption.
# Only inspection via admin API.
active false
deserializers(
payload: EventsDeserializer.new
)
end
end
end
Kafka Scope Configuration Reuse
Karafka uses the inherit
flag to support partial Kafka routing reconfiguration at the topic level. This allows you to maintain a consistent base configuration while applying specific alterations to individual topics. When the inherit flag is true
, the topic's Kafka settings will merge with the root-level defaults, enabling more granular and flexible configurations without redefining all settings.
This feature is handy in scenarios where most settings remain consistent across topics, but a few need to be customized. By leveraging the inherit option, you can streamline your configurations, reduce redundancy, and ensure that only the necessary changes are applied on a per-topic basis.
class KarafkaApp < Karafka::App
setup do |config|
config.kafka = {
'bootstrap.servers': '127.0.0.1:9092',
'auto.offset.reset': 'earliest'
}
end
routes.draw do
# Topic that inherits base kafka settings and adds a specific one
topic :example1 do
consumer ExampleConsumer1
kafka(
'enable.partition.eof': true,
inherit: true
)
end
# Topic with its own kafka settings without inheritance
topic :example2 do
consumer ExampleConsumer2
kafka(
'bootstrap.servers': '127.0.0.1:9092',
'enable.partition.eof': true
)
end
# Another topic that inherits base kafka settings and adds a specific one
topic :example3 do
consumer ExampleConsumer3
kafka(
'fetch.message.max.bytes': 10_000_000,
inherit: true
)
end
end
end
Kafka Scope Config Reuse Without inherit
When using Kafka scope configuration at the topic level, be aware that without the inherit
setting, there is no automatic inheritance of Kafka settings from the root-level configuration. This means you must duplicate all Kafka scope configurations for each topic. Failure to do so can result in default settings being applied instead of the intended configuration, which might lead to unexpected behavior or inconsistent setup across your topics.
Modular Monolith Approach
A Modular Monolith architecture focuses on separating a monolith application into well-defined, loosely coupled modules. These modules can evolve and scale independently but still operate as part of a single unit. With Karafka, embracing this architecture becomes efficient due to its flexible routing mechanism.
One of Karafka's routing beauties is the ability to call the #draw method multiple times. In a Modular Monolith architecture context, each of your application's modules can define its own set of topic routes.
-
Decoupling: Each module can define and manage its message routing without interfering with others.
-
Scalability: As modules grow, they can independently evolve their messaging strategies.
-
Maintainability: Changes to routing in one module won't impact others, making it easier to manage and refactor.
Within each module, you can define a Karafka routing block using the #draw method:
# app/modules/orders/karafka_routes.rb
Karafka.routing.draw do
topic :order_created do
consumer Orders::OrderCreatedConsumer
end
topic :order_updated do
consumer Orders::OrderUpdatedConsumer
end
end
# app/modules/users/karafka_routes.rb
Karafka.routing.draw do
topic :user_registered do
consumer Users::UserRegisteredConsumer
end
end
By leveraging the ability to draw routes multiple times, Karafka seamlessly fits into a Modular Monolith architecture. This allows for improved code organization, easier maintenance, and the flexibility to evolve each module independently.