Skip to content

Upgrading to Karafka 2.4

Pro & Enterprise Upgrade Support

If you're gearing up to upgrade to the latest Karafka version and are a Pro or Enterprise user, remember you've got a dedicated lifeline! Reach out via the dedicated Slack channel for direct support to ensure everything has been covered.

As always, please make sure you have upgraded to the most recent version of 2.3 before upgrading to 2.4.

Also, remember to read and apply our standard upgrade procedures.

Ruby 2.7 is EOL

Ruby 2.7 is no longer supported. If you still use it, you cannot upgrade to Karafka 2.4.

WaterDrop 2.7 Changes

Karafka 2.4 relies on WaterDrop 2.7. If you are using and upgrading only WaterDrop, you can find appropriate changelog here.

Below you can find list breaking of changes in WaterDrop 2.7.

wait_timeout Configuration No Longer Needed

The wait_timeout WaterDrop configuration option is no longer needed. You can safely remove it.

producer = WaterDrop::Producer.new

producer.setup do |config|
  # Other config...

  # Remove this, no longer needed
  config.wait_timeout = 30
end

Time Settings Format Alignment

All time-related values are now configured in milliseconds instead of some being in seconds and some in milliseconds.

The values that were changed from seconds to milliseconds are:

  • max_wait_timeout
  • wait_backoff_on_queue_full
  • wait_timeout_on_queue_full
  • wait_backoff_on_transaction_command, default

If you have configured any of those yourself, please replace the seconds representation with milliseconds:

producer = WaterDrop::Producer.new

producer.setup do |config|
  config.deliver = true

  # Replace this:
  config.max_wait_timeout = 30

  # With
  config.max_wait_timeout = 30_000
  # ...
end

Defaults Alignment

In this release, we've updated our default settings to address a crucial issue: previous defaults could lead to inconclusive outcomes in synchronous operations due to wait timeout errors. Users often mistakenly believed that a message dispatch was halted because of these errors when, in fact, the timeout was related to awaiting the final dispatch verdict, not the dispatch action itself.

The new defaults in WaterDrop 2.7.0 eliminate this confusion by ensuring synchronous operation results are always transparent and conclusive. This change aims to provide a straightforward understanding of wait timeout errors, reinforcing that they reflect the wait state, not the dispatch success.

Below, you can find a table with what has changed, the new defaults, and the current ones in case you want to retain the previous behavior:

Config Previous Default New Default
root max_wait_timeout 5000 ms (5 seconds) 60000 ms (60 seconds)
kafka message.timeout.ms 300000 ms (5 minutes) 50000 ms (50 seconds)
kafka transaction.timeout.ms 60000 ms (1 minute) 55000 ms (55 seconds)

This alignment ensures that when using sync operations or invoking #wait, any exception you get should give you a conclusive and final delivery verdict.

Buffering No Longer Early Validates Messages

As of version 2.7.0, WaterDrop has changed how message buffering works. Previously, messages underwent validation and middleware processing when they were buffered. Now, these steps are deferred until just before dispatching the messages. The buffer functions strictly as a thread-safe storage area without performing any validations or middleware operations until the messages are ready to be sent.

This adjustment was made primarily to ensure that middleware runs and validations are applied when most relevant—shortly before message dispatch. This approach addresses potential issues with buffers that might hold messages for extended periods:

  • Temporal Relevance: Validating and processing messages near their dispatch time helps ensure that actions such as partition assignments reflect the current system state. This is crucial in dynamic environments where system states are subject to rapid changes.

  • Stale State Management: By delaying validations and middleware to the dispatch phase, the system minimizes the risk of acting on outdated information, which could lead to incorrect processing or partitioning decisions.

# Prior to 2.7.0 this would raise an error
producer.buffer(topic: nil, payload: '')
# => WaterDrop::Errors::MessageInvalidError

# After 2.7.0 buffer will not, but flush_async will
producer.buffer(topic: nil, payload: '')
# => all good here
producer.flush_async(topic: nil, payload: '')
# => WaterDrop::Errors::MessageInvalidError

Middleware Execution Prior to Flush When Buffering

The timing of middleware execution has been adjusted. Middleware, which was previously run when messages were added to the buffer, will now only execute immediately before the messages are flushed from the buffer and dispatched. This change is similar to the validation-related changes.

rdkafka Results wait_timeout No Longer Needed

Suppose you used Rdkafka operations with the wait API and would use this argument. It is no longer needed in that case, as we have switched the operational model to async.

handler = Karafka.producer.produce_async(topic: 'my-topic', payload: 'my message')

# Replace this
handler.wait(wait_timeout: 0.1)

# With
handler.wait

Declarative Configs karafka topics migrate Config Awareness

Before Karafka 2.4, declarative topics could only manage topic creation and partitioning, not updating existing configurations to match Karafka's definitions. With version 2.4 and beyond, Karafka now updates all aspects of topic configurations to align with the settings defined in your code.

Be cautious: if you've manually adjusted topic settings outside Karafka and then run Karafka topics migrate, Karafka's definitions will overwrite those manual changes. This ensures your topics are consistently configured as per your codebase, but it also means any out-of-sync manual configurations will be reset.

We recommend manually reviewing any changes made to your Kafka topics outside Karafka's declarative configurations before migrating. You can use the bundle exec karafka topics plan command to facilitate this. This command previews the changes Karafka intends to apply to your topics when running the migration process.

Consumer Mapper Concept Removal

Karafka used to have a default strategy for building consumer group names. Each consumer group combined client_id and the group name taken from the routing.

Below, you can find how Karafka pre 2.4 would remap the following routing:

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
    config.client_id = 'my-app'
  end

  routes.draw do
    topic :example do
      consumer ExampleConsumer
    end

    topic :example2 do
      consumer Example2Consumer
    end

    consumer_group :special do
      topic :super_topic do
        consumer Example2Consumer
      end
    end
  end
end

Karafka::Web.enable!

The above setup would create three consumer groups:

Group Name Topics
my-app_app example, example2
my-app_special special
my-app_karafka_web Web UI related topics

The decision to retire the default consumer group naming strategy in Karafka was driven by a need to address several issues and simplify the system for users, especially as the framework has evolved. Here's a clear and logical explanation of the reasons behind this change:

  • The strategy was initially beneficial for setups involving multiple applications and tenants using Kafka, facilitating easier management and segregating consumer groups.

  • However, as Karafka expanded, this approach led to complications in various areas, such as the Pro Iterator, Admin features, and the development of new streaming capabilities.

  • Newcomers to Karafka, or those migrating from other frameworks, needed clarification, highlighting a need for a more straightforward approach.

  • A fundamental assumption made early in the development - that consumer group mapping would only occur from Karafka to Kafka - needed to be updated. As the framework grew, the interaction between Karafka and Kafka became bidirectional, with Karafka also receiving consumer group information from Kafka. This raised several questions:

    • Whether and how to implement reverse mapping for consumer groups.

    • The feasibility and user responsibility in providing a reverse mapper or direct mapping.

    • Ensuring comprehensive coverage of all areas requiring mapping and remapping.

  • Reports of inconsistent behavior and bugs related to consumer group mapping started emerging, underscoring the need to revise the strategy.

  • The challenges became particularly apparent while enhancing Karafka::Admin with #read_lags for advanced consumer group management, reinforcing the view that the existing mapping functionality was more problematic than beneficial.

Given these considerations, the decision to retire the consumer mapper concept was made to streamline Karafka's architecture, enhance clarity and usability, and mitigate issues stemming from the outdated one-way mapping assumption.

Custom Mapper Naming Strategy Alignment

If you've used a custom mapper, align your naming to match the new setup according to your mapper behavior.

Below are details about the upgrade path specific to your usage patterns.

Removal of Custom Mappers

In addition to the changes mentioned below for upgrading to Karafka 2.4, you need to remove the custom mapper configuration from your setup, as it's no longer supported. Specifically, delete the config.consumer_mapper line from your karafka.rb configuration:

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
    #
    # Remove the following line as it's not supported in 2.4+
    config.consumer_mapper = MyCustomConsumerMapper.new
  end
end

Aligning the Simple Routing

When working without explicit consumer groups, all you need to do is alter the group_id setting to prefix it with the client_id value:

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
    config.client_id = 'my-app'
    # `app` without anything used to be the default name in the simple routing
    config.group_id = 'my-app_app'
  end

  # Simple routing is a routing without `consumer_group` blocks
  routes.draw do
    topic :topic1 do
      consumer EventsConsumer
    end

    topic :topic2 do
      consumer WebhooksConsumer
    end
  end
end

Aligning the Multiple Consumer Groups Mode

In the case of the Multiple Consumer Group Mode, you need to update the group_id as well as the names of particular consumer groups:

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
    config.client_id = 'my-app'
    # `app` without anything used to be the default name in the simple routing
    config.group_id = 'my-app_app'
  end

  # Simple routing is a routing without `consumer_group` blocks
  routes.draw do
    # implicit group topics are covered with the `group_id` update
    topic :topic1 do
      consumer EventsConsumer
    end

    topic :topic2 do
      consumer WebhooksConsumer
    end

    # was: consumer_group do 'group_special' do
    consumer_group do 'my-app_group_special' do
      topic :topic3 do
        consumer SuperConsumer
      end
    end
  end
end

Aligning the Admin

Reconfigure Admin to use the extended group_id during Karafka configuration:

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
    config.client_id = 'my-app'
    config.admin.group_id = 'my-app_karafka_admin'
  end

  routes.draw do
    # ...
  end
end

Aligning the Web UI

Karafka Web UI injects its group, which the mapper also altered. To mitigate it, set the consumer group explicitly:

# Put the below before enabling the Web UI

Karafka::Web.setup do |config|
  # Add the prefix matching your client_id merged with the underscore
  config.group_id = 'my-app_karafka_web'
end

Karafka::Web.enable!

Aligning the CLI

Karafka CLI commands must be aligned, such as bundle exec karafka server --exclude-consumer-groups group_name2,group_name3.

Commands related to topics and subscription groups are not affected.

# Assuming that client_id is `my-app`

# Replace
bundle exec karafka server --exclude-consumer-groups group_name2,group_name3

# With
bundle exec karafka server --exclude-consumer-groups my-app_group_name2,my-app_group_name3

#stop Based Iterator API Exiting

Until Karafka 2.4 if you wanted to stop the Pro Iterator, you could just exit it's main messages loop like so:

iterator = Karafka::Pro::Iterator.new('my_topic')

iterator.each do |message|
  puts message.payload

  break
end

While this flow is still supported, the iterator object now has an explicit #stop method that will break the loop. This allows for more efficient resource management and post-execution cleanup. After you invoke #stop, #each loop will stop yielding messages.

iterator = Karafka::Pro::Iterator.new('my_topic')

iterator.each do |message|
  puts message.payload

  iterator.stop
end

config.deserializer default becomes a Routing Option

Because of the introduction of key and header deserializers, the config.deserializer option became a per-topic routing option. If you relied on the default deserializer set on the configuration level, you should migrate it into the routing defaults block:

class KarafkaApp < Karafka::App
  setup do |config|
    # Remove this
    config.deserializer = AvroDefaultDeserializer.new
  end

  routes.draw do
    # Now you can define the default deserializer here:
    defaults do
      deserializer(AvroDefaultDeserializer.new)
    end

    topic :a do
      consumer ConsumerA
      # You can still overwrite defaults on a per-topic basis
      deserializer(JsonDeserializer.new)
    end

    topic :b do
      consumer ConsumerB
      deserializer(XmlDeserializer.new)
    end
  end
end

Web UI Alignments

processing.consumer_group is now config.group_id

Karafka::Web.setup do |config|
  # Replace this
  config.processing.consumer_group = 'my-custom-web-consumer-group'

  # With
  config.group_id = 'my-custom-web-consumer-group'
end

karafka_consumers_commands Topic Introduction

Karafka 2.4 relies on Web UI 0.9+. Because of new concepts and capabilities introduced in Karafka Web UI 0.9, do not forget to run bundle exec karafka-web migrate after upgrading for every environment using Karafka Web UI before starting your consumers. This command execution is required despite Karafka having an automatic migration engine because of the introduction of the new karafka_consumers_commands topic.

Alternatively, if you created and manage Karafka Web UI topics manually, create the karafka_consumers_commands topic manually according to the settings available here.

Remember to assign proper permissions to appropriate consumer groups in ACLs based on your setup policies.

You can also customize this topic name by altering the topics.consumers.commands configuration option:

Karafka::Web.setup do |config|
  env_suffix = Rails.env.to_s

  config.topics.errors = "karafka_errors_#{env_suffix}"
  config.topics.consumers.reports = "karafka_consumers_reports_#{env_suffix}"
  config.topics.consumers.states = "karafka_consumers_states_#{env_suffix}"
  config.topics.consumers.metrics = "karafka_consumers_metrics_#{env_suffix}"
  config.topics.consumers.commands = "karafka_consumers_commands_#{env_suffix}"
end

Opting-Out From Commanding

Karafka 2.4 Pro and Enterprise introduces consumer management and probing UI. While they are turned on by default, if you do not want them, you can opt-out by setting the Web UI commanding.active setting to false. In such cases, relevant UI options and any backend-related functionalities will be completely disabled.

Karafka::Web.setup do |config|
  # Opt out if you do not want the commanding and probing UI capabilities
  config.commanding.active = false
end

Mandatory Topic Creation After Opt-Out

If you disable consumers management and probing UI in Karafka 2.4 by setting commanding.active to false, you must still create the relevant topic. Use bundle exec karafka-web migrate for automatic creation or add it manually. Skipping this step can lead to application issues, even with the feature turned off.

Karafka::Serializers::JSON::Deserializer rename to Karafka::Deserializers::Payload

Due to other changes, Karafka::Serializers::JSON::Deserializer has been renamed to Karafka::Deserializers::Payload.

No APIs were changed. Just the name. If you rely on this deserializer in your custom once, change the reference:

# Change this
class CustomDeserializer < Karafka::Serializers::JSON::Deserializer
end

# to
class CustomDeserializer < Karafka::Deserializers::Payload
end