The routing engine provides an interface to describe how messages from all the topics should be received and consumed. Due to the dynamic nature of Kafka, you can use multiple configuration options; however, only a few are required. ## Routing DSL Organization Karafka uses consumer groups to subscribe to topics. Each consumer group needs to be subscribed to at least one topic (but you can subscribe with it to as many topics as you want). To replicate this concept in our routing DSL, Karafka allows you to configure settings on two levels: * settings level - root settings that will be used everywhere * topic level - options that need to be set on a per topic level or overrides to options set on a root level !!! note "" Most of the settings (apart from the ```consumer```) are optional and if not configured, will use defaults provided during the [configuration](https://karafka.io/docs/Configuration.md) of the app itself. Karafka provides two ways of defining topics on which you want to listen: ### Single Consumer Group with Multiple Topics Mode In this mode, Karafka will create a single consumer group to which all the topics will belong. It is recommended for most of the use-cases and can be changed later. ```ruby class KarafkaApp < Karafka::App setup do |config| # ... end routes.draw do topic :example do consumer ExampleConsumer end topic :example2 do consumer Example2Consumer end end end ``` ### Multiple Consumer Groups Mode In this mode, Karafka will use a single consumer group per each of the topics defined within a single `#consumer_group` block. ```ruby class KarafkaApp < Karafka::App setup do |config| # ... end routes.draw do consumer_group :group_name do topic :example do consumer ExampleConsumer end topic :example2 do consumer ExampleConsumer2 end end consumer_group :group_name2 do topic :example3 do consumer Example2Consumer3 end end end end ``` ### Multiple Subscription Groups Mode Karafka uses a concept called `subscription groups` to organize topics into groups that can be subscribed to Kafka together. This aims to preserve resources to achieve as few connections to Kafka as possible. Each subscription group connection operates independently in a separate background thread. They do, however, share the workers pool for processing. All the subscription groups define within a single consumer group will operate within the same consumer group. !!! note "Pro Subscription Group Multiplexing" Karafka's Open Source version supports one connection per subscription group without ability to subscribe to the same topic multiple times from the same process. If you want to establish many connections to the same topic from a single process, upgrade to Karafka Pro. [Multiplexing](https://karafka.io/docs/Pro-Multiplexing.md) allows multiple connections to the same topic within a single subscription group, enhancing performance and parallel processing. ```ruby class KarafkaApp < Karafka::App setup do |config| # ... end routes.draw do subscription_group 'a' do topic :A do consumer ConsumerA end topic :B do consumer ConsumerB end topic :D do consumer ConsumerD end end subscription_group 'b' do topic :C do consumer ConsumerC end end end end ``` You can read more about the concurrency implications of using subscription groups [here](https://karafka.io/docs/Concurrency-and-Multithreading.md#parallel-kafka-connections-within-a-single-consumer-group-subscription-groups). ### Subscription Group Multiplexing For those using the advanced options in Karafka Pro, we have a special page dedicated to the Multiplexing feature. Multiplexing allows you to establish multiple independent connections to Kafka to subscribe to one topic from a single process. This detailed resource covers everything you need to know about how Multiplexing works, how to set it up, and tips for using it effectively. To learn all about this feature and make the most of it, please check out the [Multiplexing](https://karafka.io/docs/Pro-Multiplexing.md) documentation. ```ruby class KarafkaApp < Karafka::App setup do |config| # ... end routes.draw do # Always establish two independent connections to this topic from every # single process. They will be able to poll and process data independently subscription_group 'a', multiplex: 2 do topic :A do consumer ConsumerA end end end end ``` ### Routing Patterns For users leveraging the advanced capabilities of Karafka Pro, the Routing Patterns feature has its dedicated documentation page. This page delves deep into the behavior, configuration, and best practices surrounding Routing Patterns. Please refer to the [Routing Patterns documentation](https://karafka.io/docs/Pro-Routing-Patterns.md) to explore this feature in detail and gain comprehensive insights. ## Overriding Defaults Almost all the default settings configured can be changed on either on the ```topic``` level. This means that you can provide each topic with some details in case you need a non-standard way of doing things (for example, you need batch consuming only for a single topic). ## Shared Defaults This option allows you to define default settings that apply to all the topics defined in your routing unless those are defined explicitely when describing the appropriate topic. This not only simplifies configurations but also ensures consistency throughout your application. Here's how you can set up routing defaults and then define a topic that overrides one of those defaults: ```ruby class KarafkaApp < Karafka::App setup do |config| # ... end routes.draw do defaults do config( # Ensure there are always 5 partitions by default partitions: 5, # Make sure that topic is replicated in production replication_factor: Rails.env.production? ? 2 : 1 ) end topic :A do consumer ConsumerA # When overwriting defaults, all settings need to be # redefined for a given method. Partial redefinition # is not allowed and will not work config( partitions: 2, replication_factor: Rails.env.production? ? 2 : 1 ) end topic :B do consumer ConsumerB end end end ``` When you decide to override any default option for a topic within the `#topic` block, it's crucial to understand that you must set **all** the arguments for that particular option. Partial updating of arguments is not supported. Karafka will not use the user-specified defaults you've defined in the defaults block if you attempt to update the arguments for an option partially. Instead, it will revert to the framework's internal defaults for the missing arguments. This could lead to unexpected behavior in your application if not considered. ```ruby class KarafkaApp < Karafka::App setup do |config| # ... end routes.draw do defaults do config( # Ensure there are always 5 partitions by default partitions: 5, # Make sure that topic is replicated in production replication_factor: Rails.env.production? ? 2 : 1 ) end topic :A do consumer ConsumerA # BAD idea because `replication_factor` is going to be set # to `1` as it is the framework default config(partitions: 2) end end end ``` ## Topic Level Options There are several options you can set inside of the ```topic``` block. All of them except ```consumer``` are optional. Here are the most important ones: | Option | Value type | Description | |-----------------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------| | active | Boolean | Set to `false` if you want to have the given topic defined but not consumed. Helpful when working with topics via admin API | | [consumer](https://karafka.io/docs/Consuming-Messages.md) | Class | Name of a consumer class that we want to use to consume messages from a given topic | | [deserializers](https://karafka.io/docs/Deserialization.md) | Hash | Names of deserializers that we want to use to deserializes the incoming data (payload, key and headers) | | [manual_offset_management](https://karafka.io/docs/Offset-management.md#manual-offset-management) | Boolean | Should Karafka automatically mark messages as consumed or not | | [long_running_job](https://karafka.io/docs/Pro-Long-Running-Jobs.md) | Boolean | Converts this topic consumer into a job that can run longer than `max.poll.interval.ms` | | [virtual_partitions](https://karafka.io/docs/Pro-Virtual-Partitions.md) | Hash | Allows you to parallelize the processing of data from a single partition. | | [dead_letter_queue](https://karafka.io/docs/Dead-Letter-Queue.md) | Hash | Provides a systematic way of dealing with persistent consumption errors. | | [delay_by](https://karafka.io/docs/Pro-Delayed-Topics.md) | Integer | Feature that enables delaying message processing from specific topics for a specified time. | | [expire_in](https://karafka.io/docs/Pro-Expiring-Messages.md) | Integer | Feature that allows messages to be excluded from processing automatically in case they are too old. | | [filter](https://karafka.io/docs/Pro-Filtering-API.md) | `#call` | Feature that allows users to filter messages based on specific criteria. | | [config](https://karafka.io/docs/Declarative-Topics.md) | Hash | Allows for specifying each of the topic settings and their creation via the CLI commands | | [kafka](https://karafka.io/docs/Multi-Cluster-Setup.md) | Hash | Allows you to configure alternative cluster on a per-topic basis for a multi-cluster setup | ```ruby class KarafkaApp < Karafka::App setup do |config| # ... end routes.draw do consumer_group :videos_consumer do topic :binary_video_details do config(partitions: 2) consumer Videos::DetailsConsumer deserializers( payload: Serialization::Binary::Deserializer.new ) end topic :new_videos do config(partitions: 5, replication_factor: 4) consumer Videos::NewVideosConsumer end end topic :events do config(partitions: 1, 'cleanup.policy': 'compact') # Set to false because not for consumption. # Only inspection via admin API. active false deserializers( payload: EventsDeserializer.new ) end end end ``` ## Kafka Scope Configuration Reuse Karafka uses the `inherit` flag to support partial Kafka routing reconfiguration at the topic level. This allows you to maintain a consistent base configuration while applying specific alterations to individual topics. When the inherit flag is `true`, the topic's Kafka settings will merge with the root-level defaults, enabling more granular and flexible configurations without redefining all settings. This feature is handy in scenarios where most settings remain consistent across topics, but a few need to be customized. By leveraging the inherit option, you can streamline your configurations, reduce redundancy, and ensure that only the necessary changes are applied on a per-topic basis. ```ruby class KarafkaApp < Karafka::App setup do |config| config.kafka = { 'bootstrap.servers': '127.0.0.1:9092', 'auto.offset.reset': 'earliest' } end routes.draw do # Topic that inherits base kafka settings and adds a specific one topic :example1 do consumer ExampleConsumer1 kafka( 'enable.partition.eof': true, inherit: true ) end # Topic with its own kafka settings without inheritance topic :example2 do consumer ExampleConsumer2 kafka( 'bootstrap.servers': '127.0.0.1:9092', 'enable.partition.eof': true ) end # Another topic that inherits base kafka settings and adds a specific one topic :example3 do consumer ExampleConsumer3 kafka( 'fetch.message.max.bytes': 10_000_000, inherit: true ) end end end ``` !!! Warning "Kafka Scope Config Reuse Without `inherit`" When using Kafka scope configuration at the topic level, be aware that **without** the `inherit` setting, there is no automatic inheritance of Kafka settings from the root-level configuration. This means you must **duplicate** all Kafka scope configurations for each topic. Failure to do so can result in default settings being applied instead of the intended configuration, which might lead to unexpected behavior or inconsistent setup across your topics. ## Modular Monolith Approach A Modular Monolith architecture focuses on separating a monolith application into well-defined, loosely coupled modules. These modules can evolve and scale independently but still operate as part of a single unit. With Karafka, embracing this architecture becomes efficient due to its flexible routing mechanism. One of Karafka's routing beauties is the ability to call the #draw method multiple times. In a Modular Monolith architecture context, each of your application's modules can define its own set of topic routes. - **Decoupling**: Each module can define and manage its message routing without interfering with others. - **Scalability**: As modules grow, they can independently evolve their messaging strategies. - **Maintainability**: Changes to routing in one module won't impact others, making it easier to manage and refactor. Within each module, you can define a Karafka routing block using the #draw method: ```ruby # app/modules/orders/karafka_routes.rb Karafka.routing.draw do topic :order_created do consumer Orders::OrderCreatedConsumer end topic :order_updated do consumer Orders::OrderUpdatedConsumer end end ``` ```ruby # app/modules/users/karafka_routes.rb Karafka.routing.draw do topic :user_registered do consumer Users::UserRegisteredConsumer end end ``` By leveraging the ability to draw routes multiple times, Karafka seamlessly fits into a Modular Monolith architecture. This allows for improved code organization, easier maintenance, and the flexibility to evolve each module independently. --- *Last modified: 2025-05-17 10:43:50*