Upgrading to Karafka 2.4
Breaking Changes
This release contains BREAKING changes.
Because consumer mappers have been removed, you must take action in your routing to align the naming convention with the old one. This is of the utmost criticality.
Pro & Enterprise Upgrade Support
If you're gearing up to upgrade to the latest Karafka version and are a Pro or Enterprise user, remember you've got a dedicated lifeline! Reach out via the dedicated Slack channel for direct support to ensure everything has been covered.
As always, please make sure you have upgraded to the most recent version of 2.3
before upgrading to 2.4
.
Also, remember to read and apply our standard upgrade procedures.
Ruby 2.7
is EOL
Ruby 2.7
is no longer supported. If you still use it, you cannot upgrade to Karafka 2.4
.
WaterDrop 2.7 Changes
Karafka 2.4
relies on WaterDrop 2.7
. If you are using and upgrading only WaterDrop, you can find appropriate changelog here.
Below you can find list breaking of changes in WaterDrop 2.7
.
wait_timeout
Configuration No Longer Needed
The wait_timeout
WaterDrop configuration option is no longer needed. You can safely remove it.
producer = WaterDrop::Producer.new
producer.setup do |config|
# Other config...
# Remove this, no longer needed
config.wait_timeout = 30
end
Time Settings Format Alignment
All time-related values are now configured in milliseconds instead of some being in seconds and some in milliseconds.
The values that were changed from seconds to milliseconds are:
max_wait_timeout
wait_backoff_on_queue_full
wait_timeout_on_queue_full
wait_backoff_on_transaction_command, default
If you have configured any of those yourself, please replace the seconds representation with milliseconds:
producer = WaterDrop::Producer.new
producer.setup do |config|
config.deliver = true
# Replace this:
config.max_wait_timeout = 30
# With
config.max_wait_timeout = 30_000
# ...
end
Defaults Alignment
In this release, we've updated our default settings to address a crucial issue: previous defaults could lead to inconclusive outcomes in synchronous operations due to wait timeout errors. Users often mistakenly believed that a message dispatch was halted because of these errors when, in fact, the timeout was related to awaiting the final dispatch verdict, not the dispatch action itself.
The new defaults in WaterDrop 2.7.0 eliminate this confusion by ensuring synchronous operation results are always transparent and conclusive. This change aims to provide a straightforward understanding of wait timeout errors, reinforcing that they reflect the wait state, not the dispatch success.
Below, you can find a table with what has changed, the new defaults, and the current ones in case you want to retain the previous behavior:
Config | Previous Default | New Default |
---|---|---|
root max_wait_timeout |
5000 ms (5 seconds) | 60000 ms (60 seconds) |
kafka message.timeout.ms |
300000 ms (5 minutes) | 50000 ms (50 seconds) |
kafka transaction.timeout.ms |
60000 ms (1 minute) | 55000 ms (55 seconds) |
This alignment ensures that when using sync operations or invoking #wait
, any exception you get should give you a conclusive and final delivery verdict.
Buffering No Longer Early Validates Messages
As of version 2.7.0
, WaterDrop has changed how message buffering works. Previously, messages underwent validation and middleware processing when they were buffered. Now, these steps are deferred until just before dispatching the messages. The buffer functions strictly as a thread-safe storage area without performing any validations or middleware operations until the messages are ready to be sent.
This adjustment was made primarily to ensure that middleware runs and validations are applied when most relevant—shortly before message dispatch. This approach addresses potential issues with buffers that might hold messages for extended periods:
-
Temporal Relevance: Validating and processing messages near their dispatch time helps ensure that actions such as partition assignments reflect the current system state. This is crucial in dynamic environments where system states are subject to rapid changes.
-
Stale State Management: By delaying validations and middleware to the dispatch phase, the system minimizes the risk of acting on outdated information, which could lead to incorrect processing or partitioning decisions.
# Prior to 2.7.0 this would raise an error
producer.buffer(topic: nil, payload: '')
# => WaterDrop::Errors::MessageInvalidError
# After 2.7.0 buffer will not, but flush_async will
producer.buffer(topic: nil, payload: '')
# => all good here
producer.flush_async(topic: nil, payload: '')
# => WaterDrop::Errors::MessageInvalidError
Middleware Execution Prior to Flush When Buffering
The timing of middleware execution has been adjusted. Middleware, which was previously run when messages were added to the buffer, will now only execute immediately before the messages are flushed from the buffer and dispatched. This change is similar to the validation-related changes.
rdkafka
Results wait_timeout
No Longer Needed
Suppose you used Rdkafka operations with the wait
API and would use this argument. It is no longer needed in that case, as we have switched the operational model to async.
handler = Karafka.producer.produce_async(topic: 'my-topic', payload: 'my message')
# Replace this
handler.wait(wait_timeout: 0.1)
# With
handler.wait
Declarative Configs karafka topics migrate
Config Awareness
Before Karafka 2.4, declarative topics could only manage topic creation and partitioning, not updating existing configurations to match Karafka's definitions. With version 2.4 and beyond, Karafka now updates all aspects of topic configurations to align with the settings defined in your code.
Be cautious: if you've manually adjusted topic settings outside Karafka and then run Karafka topics migrate
, Karafka's definitions will overwrite those manual changes. This ensures your topics are consistently configured as per your codebase, but it also means any out-of-sync manual configurations will be reset.
We recommend manually reviewing any changes made to your Kafka topics outside Karafka's declarative configurations before migrating. You can use the bundle exec karafka topics plan
command to facilitate this. This command previews the changes Karafka intends to apply to your topics when running the migration process.
Consumer Mapper Concept Removal
Karafka used to have a default strategy for building consumer group names. Each consumer group combined client_id
and the group name taken from the routing.
Below, you can find how Karafka pre 2.4
would remap the following routing:
class KarafkaApp < Karafka::App
setup do |config|
# ...
config.client_id = 'my-app'
end
routes.draw do
topic :example do
consumer ExampleConsumer
end
topic :example2 do
consumer Example2Consumer
end
consumer_group :special do
topic :super_topic do
consumer Example2Consumer
end
end
end
end
Karafka::Web.enable!
The above setup would create three consumer groups:
Group Name Before | Group Name After | Topics |
---|---|---|
my-app_app | app | example, example2 |
my-app_special | special | super_topic |
my-app_karafka_web | karafka_web | Web UI related topics |
The decision to retire the default consumer group naming strategy in Karafka was driven by a need to address several issues and simplify the system for users, especially as the framework has evolved. Here's a clear and logical explanation of the reasons behind this change:
-
The strategy was initially beneficial for setups involving multiple applications and tenants using Kafka, facilitating easier management and segregating consumer groups.
-
However, as Karafka expanded, this approach led to complications in various areas, such as the Pro Iterator, Admin features, and the development of new streaming capabilities.
-
Newcomers to Karafka, or those migrating from other frameworks, needed clarification, highlighting a need for a more straightforward approach.
-
A fundamental assumption made early in the development - that consumer group mapping would only occur from Karafka to Kafka - needed to be updated. As the framework grew, the interaction between Karafka and Kafka became bidirectional, with Karafka also receiving consumer group information from Kafka. This raised several questions:
-
Whether and how to implement reverse mapping for consumer groups.
-
The feasibility and user responsibility in providing a reverse mapper or direct mapping.
-
Ensuring comprehensive coverage of all areas requiring mapping and remapping.
-
-
Reports of inconsistent behavior and bugs related to consumer group mapping started emerging, underscoring the need to revise the strategy.
-
The challenges became particularly apparent while enhancing
Karafka::Admin
with#read_lags
for advanced consumer group management, reinforcing the view that the existing mapping functionality was more problematic than beneficial.
Given these considerations, the decision to retire the consumer mapper concept was made to streamline Karafka's architecture, enhance clarity and usability, and mitigate issues stemming from the outdated one-way mapping assumption.
Custom Mapper Naming Strategy Alignment
If you've used a custom mapper, align your naming to match the new setup according to your mapper behavior.
Below are details about the upgrade path specific to your usage patterns.
Removal of Custom Mappers
In addition to the changes mentioned below for upgrading to Karafka 2.4, you need to remove the custom mapper configuration from your setup, as it's no longer supported. Specifically, delete the config.consumer_mapper
line from your karafka.rb
configuration:
class KarafkaApp < Karafka::App
setup do |config|
# ...
#
# Remove the following line as it's not supported in 2.4+
config.consumer_mapper = MyCustomConsumerMapper.new
end
end
Aligning the Simple Routing
When working without explicit consumer groups, all you need to do is alter the group_id
setting to prefix it with the client_id
value:
class KarafkaApp < Karafka::App
setup do |config|
# ...
config.client_id = 'my-app'
# `app` without anything used to be the default name in the simple routing
config.group_id = 'my-app_app'
end
# Simple routing is a routing without `consumer_group` blocks
routes.draw do
topic :topic1 do
consumer EventsConsumer
end
topic :topic2 do
consumer WebhooksConsumer
end
end
end
Aligning the Multiple Consumer Groups Mode
In the case of the Multiple Consumer Group Mode, you need to update the group_id
as well as the names of particular consumer groups:
class KarafkaApp < Karafka::App
setup do |config|
# ...
config.client_id = 'my-app'
# `app` without anything used to be the default name in the simple routing
config.group_id = 'my-app_app'
end
# Simple routing is a routing without `consumer_group` blocks
routes.draw do
# implicit group topics are covered with the `group_id` update
topic :topic1 do
consumer EventsConsumer
end
topic :topic2 do
consumer WebhooksConsumer
end
# was: consumer_group do 'group_special' do
consumer_group 'my-app_group_special' do
topic :topic3 do
consumer SuperConsumer
end
end
end
end
Aligning the Admin
Reconfigure Admin to use the extended group_id
during Karafka configuration:
class KarafkaApp < Karafka::App
setup do |config|
# ...
config.client_id = 'my-app'
config.admin.group_id = 'my-app_karafka_admin'
end
routes.draw do
# ...
end
end
Aligning the Web UI
Karafka Web UI injects its group, which the mapper also altered. To mitigate it, set the consumer group explicitly:
# Put the below before enabling the Web UI
Karafka::Web.setup do |config|
# Add the prefix matching your client_id merged with the underscore
config.group_id = 'my-app_karafka_web'
end
Karafka::Web.enable!
Aligning the CLI
Karafka CLI commands must be aligned, such as bundle exec karafka server --exclude-consumer-groups group_name2,group_name3
.
Commands related to topics and subscription groups are not affected.
# Assuming that client_id is `my-app`
# Replace
bundle exec karafka server --exclude-consumer-groups group_name2,group_name3
# With
bundle exec karafka server --exclude-consumer-groups my-app_group_name2,my-app_group_name3
#stop
Based Iterator API Exiting
Until Karafka 2.4
if you wanted to stop the Pro Iterator, you could just exit it's main messages loop like so:
iterator = Karafka::Pro::Iterator.new('my_topic')
iterator.each do |message|
puts message.payload
break
end
While this flow is still supported, the iterator object now has an explicit #stop
method that will break the loop. This allows for more efficient resource management and post-execution cleanup. After you invoke #stop
, #each
loop will stop yielding messages.
iterator = Karafka::Pro::Iterator.new('my_topic')
iterator.each do |message|
puts message.payload
iterator.stop
end
config.deserializer
default becomes a Routing Option
Because of the introduction of key
and header
deserializers, the config.deserializer
option became a per-topic routing option. If you relied on the default deserializer set on the configuration level, you should migrate it into the routing defaults
block:
class KarafkaApp < Karafka::App
setup do |config|
# Remove this
config.deserializer = AvroDefaultDeserializer.new
end
routes.draw do
# Now you can define the default deserializer here:
defaults do
deserializer(AvroDefaultDeserializer.new)
end
topic :a do
consumer ConsumerA
# You can still overwrite defaults on a per-topic basis
deserializer(JsonDeserializer.new)
end
topic :b do
consumer ConsumerB
deserializer(XmlDeserializer.new)
end
end
end
Web UI Alignments
processing.consumer_group
is now config.group_id
Karafka::Web.setup do |config|
# Replace this
config.processing.consumer_group = 'my-custom-web-consumer-group'
# With
config.group_id = 'my-custom-web-consumer-group'
end
karafka_consumers_commands
Topic Introduction
Karafka 2.4
relies on Web UI 0.9+
. Because of new concepts and capabilities introduced in Karafka Web UI 0.9
, do not forget to run bundle exec karafka-web migrate
after upgrading for every environment using Karafka Web UI before starting your consumers. This command execution is required despite Karafka having an automatic migration engine because of the introduction of the new karafka_consumers_commands
topic.
Alternatively, if you created and manage Karafka Web UI topics manually, create the karafka_consumers_commands
topic manually according to the settings available here.
Remember to assign proper permissions to appropriate consumer groups in ACLs based on your setup policies.
You can also customize this topic name by altering the topics.consumers.commands
configuration option:
Karafka::Web.setup do |config|
env_suffix = Rails.env.to_s
config.topics.errors = "karafka_errors_#{env_suffix}"
config.topics.consumers.reports = "karafka_consumers_reports_#{env_suffix}"
config.topics.consumers.states = "karafka_consumers_states_#{env_suffix}"
config.topics.consumers.metrics = "karafka_consumers_metrics_#{env_suffix}"
config.topics.consumers.commands = "karafka_consumers_commands_#{env_suffix}"
end
Opting-Out From Commanding
Karafka 2.4
Pro and Enterprise introduces consumer management and probing UI. While they are turned on by default, if you do not want them, you can opt-out by setting the Web UI commanding.active
setting to false.
In such cases, relevant UI options and any backend-related functionalities will be completely disabled.
Karafka::Web.setup do |config|
# Opt out if you do not want the commanding and probing UI capabilities
config.commanding.active = false
end
Mandatory Topic Creation After Opt-Out
If you disable consumers management and probing UI in Karafka 2.4
by setting commanding.active
to false
, you must still create the relevant topic. Use bundle exec karafka-web migrate
for automatic creation or add it manually. Skipping this step can lead to application issues, even with the feature turned off.
Karafka::Serializers::JSON::Deserializer
rename to Karafka::Deserializers::Payload
Due to other changes, Karafka::Serializers::JSON::Deserializer
has been renamed to Karafka::Deserializers::Payload
.
No APIs were changed. Just the name. If you rely on this deserializer in your custom once, change the reference:
# Change this
class CustomDeserializer < Karafka::Serializers::JSON::Deserializer
end
# to
class CustomDeserializer < Karafka::Deserializers::Payload
end