Skip to content

Upgrading to 2.0

Welcome to Karafka 2.0!

Karafka 2.0 is a major rewrite that brings many new things to the table but removes specific concepts that could have been better when I created them.

In this upgrade document, I will describe the most noticeable changes that require manual intervention to handle the upgrade process. This document does not cover new functionalities but aims to guide the upgrade process.

Before reading this article, please ensure you've read the Karafka framework 2.0 announcement blog post so you are familiar with the overall scope of changes.

Please note, that there are many aspects of Karafka upgrade that are specific to your application. While we may not have covered all of the cases here, do not hesitate to ask questions either via Slack or by creating a Github issue.

Those upgrade notes will be extended whenever someone points out any missing content.

Note on concurrency

Karafka 2.0 is multi-threaded. This means that all of your code needs to be thread-safe.

Please keep in mind that Karafka does not provide warranties, that the same consumer will always run in the same Ruby thread, hence code like this:

def consume
  Thread.current[:accumulator] ||= []
  Thread.current[:accumulator] += messages.payloads
end

may cause severe problems and is not recommended.

Deploying Karafka 2.0 after the upgrade

To safely upgrade from Karafka from 1.4 to 2.0 you need to shut down all your consumers completely.

Gemfile alignment

  1. Update your karafka gem version reference
# Replace
gem 'karafka', '~> 1.4'

# with
gem 'karafka', '~> 2.0'
  1. Remove the sidekiq-backend reference
# This needs to be removed
gem 'karafka-sidekiq-backend', '~> 1.4'
  1. Update karafka-testing version reference
# Replace
gem 'karafka-testing', '~> 1.4'

# with
gem 'karafka-testing', '~> 2.0'
  1. Run bundle install

WaterDrop (producer) update

  1. Remove WaterDrop setup code from your karafka.rb:
# This can be safely removed
monitor.subscribe('app.initialized') do
  WaterDrop.setup { |config| config.deliver = !Karafka.env.test? }
end

you can remove it.

  1. Remove direct WaterDrop listener references from your karafka.rb
# This can be safely removed
Karafka.monitor.subscribe(WaterDrop::Instrumentation::LoggerListener.new)

Settings alignment

Karafka 2.0 is powered by librdkafka. Because of that, we've decided to split the settings into two sections:

  • karafka options - options directly related to the Karafka framework and its components.
  • librdkafka options - options related to librdkafka.

All karafka options should be set at the root level, while all the librdkafka options need to go under the kafka scope:

class KarafkaApp < Karafka::App
  setup do |config|
    config.client_id = 'my_application'
    # librdkafka configuration options need to be set as symbol values
    config.kafka = {
      'bootstrap.servers': '127.0.0.1:9092'
    }
  end
end

Below you can find some of the most significant naming changes in the configuration options:

Root options:

  • start_from_beginning is now initial_offset and accepts either 'earliest' or 'latest'
  • ssl_ca_certs_from_system is no longer needed but kafka security.protocol needs to be set to ssl
  • batch_fetching is no longer needed
  • batch_consuming is no longer needed
  • serializer is no longer needed because Responders have been removed from Karafka
  • topic_mapper is no longer needed as concept of mapping topic names has been removed from Karafka
  • backend is no longer needed because Karafka is now multi-threaded with single-threaded backend
  • manual_offset_management needs to be set now on a per topic basis

Kafka options:

  • kafka.seed_brokers is now bootstrap.servers without the protocol definition
  • kafka.heartbeat_interval is no longer needed.
  • SASL and SSL options changes are described in their own section.
  • Check out the configuration details of librdkafka for all the remaining options.

Heartbeat no longer needed

Sending heartbeats is no longer needed. Both #trigger_heartbeat and trigger_heartbeat! can be safely removed.

SASL, SSL, authentication

Please read the Deployment documentation to see appropriate configuration for given environment.

You can optionally check the librdkafka configuration documentation as well.

If you still struggle, feel free to reach out to us either via Slack or by creating a Github issue.

Manual offset management is now a per-topic setting

In Karafka 1.4 you could set config.manual_offset_management = true to make all the topics work with manual offset management.

This option is no longer available in 2.0 and needs to be set per topic as followed:

class KarafkaApp < Karafka::App
  routes.draw do
    consumer_group :events do
      topic :user_events do
        consumer EventsConsumer
        manual_offset_management true
      end
    end
  end
end

Ruby on Rails integration

Karafka 2.0 introduces seamless Ruby on Rails integration via Rails::Railte without needing extra configuration.

Your karafka.rb should contain only Karafka-specific stuff, and the rest will be done automatically. This means you need to reverse the manual setup steps that were needed for Karafka 1.4 to work with Ruby on Rails.

  1. Remove any changes in the config/environment.rb of your Rails application related to Karafka:
# environment.rb - this needs to be removed
require Rails.root.join(Karafka.boot_file)
  1. Remove those lines from your karafka.rb:
ENV['RAILS_ENV'] ||= 'development'
ENV['KARAFKA_ENV'] = ENV['RAILS_ENV']
require ::File.expand_path('../config/environment', __FILE__)
Rails.application.eager_load!

if Rails.env.development?
  Rails.logger.extend(
    ActiveSupport::Logger.broadcast(
      ActiveSupport::Logger.new($stdout)
    )
  )
end
  1. Remove the Rails logger assignment from your karafka.rb:
class KarafkaApp < Karafka::App
  setup do |config|
    # ...

    # Remove this line
    config.logger = Rails.logger
  end
end
  1. Remove the code reloader (if used):
Karafka.monitor.subscribe(
  Karafka::CodeReloader.new(
    *Rails.application.reloaders
  )
)
  1. Remove the KarafkaApp.boot! from the end of karafka.rb:
# Remove this

KarafkaApp.boot!

Consumer callbacks are no longer supported

All of the consumer callbacks were removed. They were replaced with the following lifecycle consumer methods:

  • #revoked - runs the code you want to execute when a given topic partition has been revoked from a given consumer instance.
  • #shutdown - runs the code you want to execute when the Karafka process is being shut down.

You can still use instrumentation hooks if you need to perform any consumer, not related actions, but please be aware that this code will not run in the same thread as the consumption.

Unified error message bus

Karafka 1.4 published errors under several instrumentation keys. Karafka 2.0 publishes all the errors under the same instrumentation event name: error.occurred.

The payload there always contains a type field that can be used to understand the origin of the issue.

Routing changes

Simple routing style creates now a single consumer group for all the topics defined on a root level.

This is a major change that can heavily impact your system.

If you used the "non consumer group" based routing where all the topics would be defined on the root level of the routing:

class KarafkaApp < Karafka::App
  routes.draw do
    topic :user_events do
      consumer UsersEventsConsumer
    end

    topic :system_events do
      consumer SystemEventsConsumer
    end

    topic :payment_events do
      consumer PaymentEventsConsumer
    end
  end
end

Karafka 1.4 would create for you three separate consumer groups while Karafka 2.0 will create one:

# Karafka 1.4
Karafka::App.consumer_groups.count #=> 3

# Karafka 2.0
Karafka::App.consumer_groups.count #=> 1

To mitigate this you need to:

  1. List all the groups names while in 1.4:
Karafka::App.consumer_groups.map(&:name)
#=> ['user_events', 'system_events', 'payment_events']
  1. Replicate this setup in Karafka 2.0 by creating three separate consumer groups directly:
class KarafkaApp < Karafka::App
  routes.draw do
    consumer_group :user_events do
      topic :user_events do
        consumer UsersEventsConsumer
      end
    end

    consumer_group :system_events do
      topic :system_events do
        consumer SystemEventsConsumer
      end
    end

    consumer_group :payment_events do
      topic :payment_events do
        consumer PaymentEventsConsumer
      end
    end
  end
end
  1. After applying the above changes, validate, that the following command gives you same results under 1.4 and 2.0:
Karafka::App.consumer_groups.map(&:id)
#=> ['example_app_user_events', 'example_app_system_events', 'example_app_payment_events']

Topic mappers are no longer supported

Note: Unless you used Heroku, you can probably skip this section.

Topic mapping is no longer supported. Please prefix all of your topic names with the env KAFKA_PREFIX.

You can read more about integrating Karafka 2.0 with Heroku here.

Pidfile and daemonization support has been removed

Quote from Mike:

  • Don't daemonize, start Karafka with a process supervisor like systemd, upstart, foreman, etc.
  • Log only to STDOUT, the entity starting Karafka can control where STDOUT redirects to, if any.
  • PID files are a legacy of double forking and have no reason to exist anymore.

sidekiq-backend is no longer supported

Karafka 2.0 is multi-threaded.

If you use sidekiq-backend, you have two options:

  • Pipe the jobs to Sidekiq yourself
  • Elevate Karafka's multi-threading capabilities

Responders are now replaced with Karafka.producer

Responders were a dead end.

Please use Waterdrop to produce messages via Karafka.producer:

  1. Replace direct responders usage with Karafka.producer usage
class ExampleResponder < ApplicationResponder
  topic :users_notified

  def respond(user)
    respond_to :users_notified, user
  end
end

ExampleResponder.call(User.last)

With:

Karafka.producer.produce_async(
  topic: 'users_notified',
  payload: user.to_json,
  partition_key: user.id.to_s
)
  1. Replace all the #respond_with consumer responder invocations with direct Karafka.producer code

Simple message consumption mode is no longer supported

You can achieve this functionality by slightly altering your consumer:

class SingleMessageBaseConsumer < Karafka::BaseConsumer
  attr_reader :message

  def consume
    messages.each do |message|
      @message = message
      consume_one

      mark_as_consumed(message)
    end
  end
end

class Consumer < SingleMessageBaseConsumer
  def consume_one
    puts "I received following message: #{message.payload}"
  end
end

Here you can find more details about this.

Dependency changes

Karafka 2.0 no longer uses any of the dry-rb ecosystem libraries. If you've relied on them indirectly via Karafka, you will have to define them in your Gemfile.

Naming convention changes

#params_batch is now #messages:


def consume
  # params_batch.each do |message|
  messages.each do |message|
    @message = message
    consume_one

    mark_as_consumed(message)
  end
end

Instrumentation and monitoring

Instrumentation and monitoring are often application specific.

The recommendation here is to revisit your current integration and align it with the events published by Karafka and to follow the 2.0 instrumentation guidelines document.

  • Karafka::Instrumentation::StdoutListener is now Karafka::Instrumentation::LoggerListener

Tips and tricks

  1. bootstrap.servers setting under kafka should not have a protocol, and it needs to be a string with comma-separated hosts.

BAD:

# protocol should not be here
config.kafka = { 'bootstrap.servers': 'kafka://my.kafka.host1:9092' }

BAD:

# it should be a comma separate string, not an array
config.kafka = { 'bootstrap.servers': ['my.kafka.host1:9092', 'my.kafka.host2:9092'] }

GOOD:

config.kafka = { 'bootstrap.servers': 'my.kafka.host1:9092,my.kafka.host2:9092' }
  1. Make sure not to overwrite your settings similar to how it was done in the section below.

BAD:

config.kafka = {
  'bootstrap.servers': ENV.fetch('BROKERS').split(',')
}

# This section will FULLY overwrite the `bootstrap.servers`.
# You want to merge those sections and NOT overwrite.
if Rails.env.production?
  config.kafka = {
    'ssl.ca.pem':          ENV.fetch('CA_CERT'),
    'ssl.certificate.pem': ENV.fetch('CLIENT_CERT'),
    'ssl.key.pem':         ENV.fetch('CLIENT_CERT_KEY'),
    'security.protocol':   'ssl'
  }
end

GOOD:

config.kafka = {
  'bootstrap.servers': ENV.fetch('BROKERS').split(',')
}

if Rails.env.production?
  config.kafka.merge!({
    'ssl.ca.pem':          ENV.fetch('CA_CERT'),
    'ssl.certificate.pem': ENV.fetch('CLIENT_CERT'),
    'ssl.key.pem':         ENV.fetch('CLIENT_CERT_KEY'),
    'security.protocol':   'ssl'
  })
end