Upgrading to Karafka 2.0¶
Further Breaking Changes Beyond 2.0
This document explicitly covers the upgrade process from Karafka 1.4 to 2.0. If you plan to upgrade directly to a version higher than Karafka 2.0, such as Karafka 2.4, it is absolutely essential to review the upgrade guides for all intermediary versions. Notably, Karafka 2.4 introduces additional breaking changes that can be unexpected and potentially disruptive if you skip directly from 1.4 to 2.4 without following each version's upgrade path. Ignoring these steps may lead to significant issues in your application.
Pro & Enterprise Upgrade Support
If you're gearing up to upgrade to the latest Karafka version and are a Pro or Enterprise user, remember you've got a dedicated lifeline! Reach out via the dedicated Slack channel for direct support to ensure everything has been covered.
Karafka 2.0 is a major rewrite that brings many new things to the table but removes specific concepts that could have been better when I created them.
In this upgrade document, I will describe the most noticeable changes that require manual intervention to handle the upgrade process. This document does not cover new functionalities but aims to guide the upgrade process.
Before reading this article, please ensure you've read the Karafka framework 2.0 announcement blog post so you are familiar with the overall scope of changes.
Please note, that there are many aspects of Karafka upgrade that are specific to your application. While we may not have covered all of the cases here, do not hesitate to ask questions either via Slack or by creating a Github issue.
Those upgrade notes will be extended whenever someone points out any missing content.
Note on concurrency¶
Karafka 2.0 is multi-threaded. This means that all of your code needs to be thread-safe.
Please keep in mind that Karafka does not provide warranties, that the same consumer will always run in the same Ruby thread, hence code like this:
def consume
Thread.current[:accumulator] ||= []
Thread.current[:accumulator] += messages.payloads
end
may cause severe problems and is not recommended.
Deploying Karafka 2.0 after the upgrade¶
To safely upgrade from Karafka from 1.4 to 2.0, you need to shut down completely all the consumers of the application you are upgrading.
Gemfile alignment¶
-
Update your
karafkagem version reference# Replace gem 'karafka', '~> 1.4' # with gem 'karafka', '~> 2.0' -
Remove the
sidekiq-backendreference# This needs to be removed gem 'karafka-sidekiq-backend', '~> 1.4' -
Update
karafka-testingversion reference# Replace gem 'karafka-testing', '~> 1.4' # with gem 'karafka-testing', '~> 2.0' -
Run
bundle install
WaterDrop (producer) update¶
-
Remove WaterDrop setup code from your
karafka.rb:# This can be safely removed monitor.subscribe('app.initialized') do WaterDrop.setup { |config| config.deliver = !Karafka.env.test? } end -
Remove direct WaterDrop listener references from your
karafka.rb# This can be safely removed Karafka.monitor.subscribe(WaterDrop::Instrumentation::LoggerListener.new)
Settings alignment¶
Karafka 2.0 is powered by librdkafka. Because of that, we've decided to split the settings into two sections:
karafkaoptions - options directly related to the Karafka framework and its components.librdkafkaoptions - options related to librdkafka.
All karafka options should be set at the root level, while all the librdkafka options need to go under the kafka scope:
class KarafkaApp < Karafka::App
setup do |config|
config.client_id = 'my_application'
# librdkafka configuration options need to be set as symbol values
config.kafka = {
'bootstrap.servers': '127.0.0.1:9092'
}
end
end
Below you can find some of the most significant naming changes in the configuration options:
Root options:
start_from_beginningis nowinitial_offsetand accepts either'earliest'or'latest'ssl_ca_certs_from_systemis no longer needed butkafkasecurity.protocolneeds to be set tosslbatch_fetchingis no longer neededbatch_consumingis no longer neededserializeris no longer needed becauseRespondershave been removed from Karafkatopic_mapperis no longer needed as concept of mapping topic names has been removed from Karafkabackendis no longer needed because Karafka is now multi-threadedmanual_offset_managementneeds to be set now on a per topic basis
Kafka options:
kafka.seed_brokersis nowbootstrap.serverswithout the protocol definitionkafka.heartbeat_intervalis no longer needed.SASLandSSLoptions changes are described in their own section.- Check out the configuration details of librdkafka for all the remaining options.
Heartbeat no longer needed¶
Sending heartbeats is no longer needed. Both #trigger_heartbeat and trigger_heartbeat! can be safely removed.
SASL, SSL, authentication¶
Please read the Deployment documentation to see appropriate configuration for given environment.
You can optionally check the librdkafka configuration documentation as well.
If you still struggle, feel free to reach out to us either via Slack or by creating a Github issue.
Manual offset management is now a per-topic setting¶
In Karafka 1.4 you could set config.manual_offset_management = true to make all the topics work with manual offset management.
This option is no longer available in 2.0 and needs to be set per topic as followed:
class KarafkaApp < Karafka::App
routes.draw do
consumer_group :events do
topic :user_events do
consumer EventsConsumer
manual_offset_management true
end
end
end
end
Ruby on Rails integration¶
Karafka 2.0 introduces seamless Ruby on Rails integration via Rails::Railte without needing extra configuration.
Your karafka.rb should contain only Karafka-specific stuff, and the rest will be done automatically. This means you need to reverse the manual setup steps that were needed for Karafka 1.4 to work with Ruby on Rails.
-
Remove any changes in the
config/environment.rbof your Rails application related to Karafka:# environment.rb - this needs to be removed require Rails.root.join(Karafka.boot_file) -
Remove those lines from your
karafka.rb:ENV['RAILS_ENV'] ||= 'development' ENV['KARAFKA_ENV'] = ENV['RAILS_ENV'] require ::File.expand_path('../config/environment', __FILE__) Rails.application.eager_load! if Rails.env.development? Rails.logger.extend( ActiveSupport::Logger.broadcast( ActiveSupport::Logger.new($stdout) ) ) end -
Remove the Rails logger assignment from your
karafka.rb:class KarafkaApp < Karafka::App setup do |config| # ... # Remove this line config.logger = Rails.logger end end -
Remove the code reloader (if used):
Karafka.monitor.subscribe( Karafka::CodeReloader.new( *Rails.application.reloaders ) ) -
Remove the
KarafkaApp.boot!from the end ofkarafka.rb:# Remove this KarafkaApp.boot!
Consumer callbacks are no longer supported¶
All of the consumer callbacks were removed. They were replaced with the following lifecycle consumer methods:
#revoked- runs the code you want to execute when a given topic partition has been revoked from a given consumer instance.#shutdown- runs the code you want to execute when the Karafka process is being shut down.
You can still use instrumentation hooks if you need to perform any consumer, not related actions, but please be aware that this code will not run in the same thread as the consumption.
Unified error message bus¶
Karafka 1.4 published errors under several instrumentation keys. Karafka 2.0 publishes all the errors under the same instrumentation event name: error.occurred.
The payload there always contains a type field that can be used to understand the origin of the issue.
Routing changes¶
Simple routing style creates now a single consumer group for all the topics defined on a root level.
This is a major change that can heavily impact your system.
If you used the "non consumer group" based routing where all the topics would be defined on the root level of the routing:
class KarafkaApp < Karafka::App
routes.draw do
topic :user_events do
consumer UsersEventsConsumer
end
topic :system_events do
consumer SystemEventsConsumer
end
topic :payment_events do
consumer PaymentEventsConsumer
end
end
end
Karafka 1.4 would create for you three separate consumer groups while Karafka 2.0 will create one:
# Karafka 1.4
Karafka::App.consumer_groups.count #=> 3
# Karafka 2.0
Karafka::App.consumer_groups.count #=> 1
To mitigate this you need to:
-
List all the groups names while in
1.4:Karafka::App.consumer_groups.map(&:name) #=> ['user_events', 'system_events', 'payment_events'] -
Replicate this setup in Karafka
2.0by creating three separate consumer groups directly:class KarafkaApp < Karafka::App routes.draw do consumer_group :user_events do topic :user_events do consumer UsersEventsConsumer end end consumer_group :system_events do topic :system_events do consumer SystemEventsConsumer end end consumer_group :payment_events do topic :payment_events do consumer PaymentEventsConsumer end end end end -
After applying the above changes, validate, that the following command gives you same results under
1.4and2.0:Karafka::App.consumer_groups.map(&:id) #=> ['example_app_user_events', 'example_app_system_events', 'example_app_payment_events']
Topic mappers are no longer supported¶
Note
Unless you used Heroku, you can probably skip this section.
Topic mapping is no longer supported. Please prefix all of your topic names with the env KAFKA_PREFIX.
You can read more about integrating Karafka 2.0 with Heroku here.
Pidfile and daemonization support has been removed¶
Quote from Mike:
- Don't daemonize, start Karafka with a process supervisor like systemd, upstart, foreman, etc.
- Log only to STDOUT, the entity starting Karafka can control where STDOUT redirects to, if any.
- PID files are a legacy of double forking and have no reason to exist anymore.
sidekiq-backend is no longer supported¶
Karafka 2.0 is multi-threaded.
If you use sidekiq-backend, you have two options:
- Pipe the jobs to Sidekiq yourself
- Elevate Karafka's multi-threading capabilities
Responders are now replaced with Karafka.producer¶
Responders were a dead end.
Please use Waterdrop to produce messages via Karafka.producer:
-
Replace direct responders usage with
Karafka.producerusageclass ExampleResponder < ApplicationResponder topic :users_notified def respond(user) respond_to :users_notified, user end end ExampleResponder.call(User.last)With:
Karafka.producer.produce_async( topic: 'users_notified', payload: user.to_json, partition_key: user.id.to_s ) -
Replace all the
#respond_withconsumer responder invocations with directKarafka.producercode.
Simple message consumption mode is no longer supported¶
You can achieve this functionality by slightly altering your consumer:
class SingleMessageBaseConsumer < Karafka::BaseConsumer
attr_reader :message
def consume
messages.each do |message|
@message = message
consume_one
mark_as_consumed(message)
end
end
end
class Consumer < SingleMessageBaseConsumer
def consume_one
puts "I received following message: #{message.payload}"
end
end
Here you can find more details about this.
Dependency changes¶
Karafka 2.0 no longer uses any of the dry-rb ecosystem libraries. If you've relied on them indirectly via Karafka, you will have to define them in your Gemfile.
Naming convention changes¶
#params_batch is now #messages:
def consume
# params_batch.each do |message|
messages.each do |message|
@message = message
consume_one
mark_as_consumed(message)
end
end
Instrumentation and monitoring¶
Instrumentation and monitoring are often application specific.
The recommendation here is to revisit your current integration and align it with the events published by Karafka and to follow the 2.0 instrumentation guidelines document.
Karafka::Instrumentation::StdoutListeneris nowKarafka::Instrumentation::LoggerListener
karafka-testing gem code adjustments¶
Alongside the recent updates to Karafka, the karafka-testing gem has also been updated. The comprehensive guides for the testing gem can be accessed here.
Tips and tricks¶
-
bootstrap.serverssetting underkafkashould not have a protocol, and it needs to be a string with comma-separated hosts.BAD:
# protocol should not be here config.kafka = { 'bootstrap.servers': 'kafka://my.kafka.host1:9092' }BAD:
# it should be a comma separate string, not an array config.kafka = { 'bootstrap.servers': ['my.kafka.host1:9092', 'my.kafka.host2:9092'] }GOOD:
config.kafka = { 'bootstrap.servers': 'my.kafka.host1:9092,my.kafka.host2:9092' } -
Make sure not to overwrite your settings similar to how it was done in the section below.
BAD:
config.kafka = { 'bootstrap.servers': ENV.fetch('BROKERS').split(',') } # This section will FULLY overwrite the `bootstrap.servers`. # You want to merge those sections and NOT overwrite. if Rails.env.production? config.kafka = { 'ssl.ca.pem': ENV.fetch('CA_CERT'), 'ssl.certificate.pem': ENV.fetch('CLIENT_CERT'), 'ssl.key.pem': ENV.fetch('CLIENT_CERT_KEY'), 'security.protocol': 'ssl' } endGOOD:
config.kafka = { 'bootstrap.servers': ENV.fetch('BROKERS').split(',') } if Rails.env.production? config.kafka.merge!({ 'ssl.ca.pem': ENV.fetch('CA_CERT'), 'ssl.certificate.pem': ENV.fetch('CLIENT_CERT'), 'ssl.key.pem': ENV.fetch('CLIENT_CERT_KEY'), 'security.protocol': 'ssl' }) end
Last modified: 2025-10-22 14:52:56