Skip to content

Upgrading to Karafka 2.5

Karafka 2.5 introduces two breaking changes that align naming conventions with other Kafka ecosystem tools, such as Kafka Streams and Apache Flink. These low-risk changes focus on configuration alignment and naming conventions without affecting consumer group names or processing flows.

Pro & Enterprise Upgrade Support

If you're gearing up to upgrade to the latest Karafka version and are a Pro or Enterprise user, remember you've got a dedicated lifeline! Reach out via the dedicated Slack channel for direct support to ensure everything has been covered.

As always, please make sure you have upgraded to the most recent version of 2.4 before upgrading to 2.5.

Also, remember to read and apply our standard upgrade procedures.

DLQ and Piping Header Prefix Change

The prefix for DLQ (Dead Letter Queue) dispatched and piped messages headers has been changed from original_ to source_:

  • original_topicsource_topic
  • original_partitionsource_partition
  • original_offsetsource_offset
  • original_consumer_groupsource_consumer_group
  • original_attemptssource_attempts

This change aligns Karafka's naming conventions with Kafka Streams and Apache Flink for better ecosystem consistency and future compatibility.

Pro DLQ Message Key and Routing Changes

Karafka Pro's Dead Letter Queue has undergone important change to improve message routing consistency and key preservation.

Key Preservation

The Pro DLQ now preserves the original message key instead of overwriting it with the source partition ID. This ensures that the original routing logic is maintained when messages are dispatched to the DLQ topic, preserving the source partition ordering as well.

Before:

# DLQ message key was set to the source partition ID
dlq_message[:key] = skippable_message.partition.to_s

After:

# DLQ message key preserves the original message key
dlq_message[:key] = skippable_message.raw_key
# Used for partition targeting
dlq_message[:partition_key] = skippable_message.partition.to_s

Removed source_key Header

The source_key header has been removed from DLQ dispatched messages since the original key is now fully preserved in the message key field.

Before:

# DLQ messages included source_key in headers
dlq_message[:headers] = {
  'source_topic' => topic.name,
  'source_partition' => source_partition,
  'source_offset' => skippable_message.offset.to_s,
  'source_consumer_group' => topic.consumer_group.id,
  # This header below is removed
  'source_key' => skippable_message.raw_key.to_s,
  'source_attempts' => attempt.to_s
}

After:

# source_key header is no longer included
dlq_message[:headers] = {
  'source_topic' => topic.name,
  'source_partition' => source_partition,
  'source_offset' => skippable_message.offset.to_s,
  'source_consumer_group' => topic.consumer_group.id,
  'source_attempts' => attempt.to_s
}

Partition Routing Consistency

The Pro DLQ now uses partition_key for consistent partition targeting while preserving the original message key. This ensures that:

  • Messages from the same source partition still route to one and the same DLQ partition
  • The original message key is preserved for application-level routing logic
  • Partition targeting is handled separately from message identification

Impact on Your Code

Mid-Risk Changes:

  1. Key Access: If you were accessing the DLQ message key expecting it to be the source partition, update your code to use the source_partition header instead
  2. Header Access: Remove any code that accesses the source_key header from DLQ messages, as this information is now available in the DLQ message key
  3. Custom DLQ Enhancement: Review any #enhance_dlq_message implementations that modify the key field

Critical Partitioning Change:

While ordering within partitions is preserved, DLQ message partitioning behavior has changed. Previously, DLQ messages were partitioned based on the source partition ID (assigned as the key). Now, partitioning is based on the original message key, which may result in messages being distributed differently across DLQ partitions after the upgrade.

Messages that previously went to the same DLQ partition will still all go to one partition, but it may not be the same partition as before the upgrade.

This could affect:

  • DLQ consumer processing order expectations
  • Partition-specific recovery strategies
  • Monitoring and alerting based on DLQ partition distribution

Maintaining Previous Partitioning Behavior:

If you need to maintain the previous partitioning behavior (routing based on source partition), you can use #enhance_dlq_message to override the key:

class MyConsumer
  def consume
    # some code that can raise an error...
  end

  private

  def enhance_dlq_message(dlq_message, skippable_message)
    # Restore previous partitioning behavior by using source partition as key
    dlq_message[:key] = skippable_message.partition.to_s

    # Optionally preserve original key in headers for reference
    dlq_message[:headers]['source_key'] = skippable_message.raw_key.to_s
  end
end

Example Migration:

# Before - accessing source info from DLQ message
def process_dlq_message(message)
  source_partition = message.key.to_i  # This was the partition ID
  original_key = message.headers['source_key']  # Original key was in headers
end

# After - updated access pattern
def process_dlq_message(message)
  source_partition = message.headers['source_partition'].to_i  # Use header
  original_key = message.key  # Original key is preserved
end

These changes enhance the Pro DLQ's consistency with Kafka's native partitioning behavior while maintaining backward compatibility for most use cases.

Web UI Topic Configuration Structure

Additional Web UI Changes

Aside from this change below to Web UI, there are other subtle changes to Web UI. Full upgrade docs for Web UI 0.11 are available here.

The Web UI component configuration has been restructured to use a nested format with the .name property for topic redefinitions:

Before:

config.topics.consumers.reports = "app_karafka_web_consumers_reports"
config.topics.consumers.states = "app_karafka_web_consumers_states"
config.topics.consumers.metrics = "app_karafka_web_consumers_metrics"
config.topics.consumers.commands = "app_karafka_web_consumers_commands"
config.topics.errors = "app_karafka_web_errors"

After:

config.topics.consumers.reports.name = "app_karafka_web_consumers_reports"
config.topics.consumers.states.name = "app_karafka_web_consumers_states"
config.topics.consumers.metrics.name = "app_karafka_web_consumers_metrics"
config.topics.consumers.commands.name = "app_karafka_web_consumers_commands"
config.topics.errors.name = "app_karafka_web_errors"

This change aligns with the namespacing pattern used across Karafka components, providing a consistent configuration approach for topic naming. The direct assignment has been replaced with a nested structure where each topic configuration is a separate object with its own properties, with .name being used to define the actual Kafka topic name.

Recurring Tasks Topic Configuration Structure (Pro)

The scheduled jobs topics configuration has been restructured to use a nested format:

Before:

config.recurring_tasks.topics.schedules = "karafka_recurring_tasks_schedules"
config.recurring_tasks.topics.logs = "karafka_recurring_tasks_logs"

After:

config.recurring_tasks.topics.schedules.name = "karafka_recurring_tasks_schedules"
config.recurring_tasks.topics.logs.name = "karafka_recurring_tasks_logs"

This change aligns with the Web UI's topic namespacing pattern of using topic.name.

Kubernetes Health Check Response Format Change

The Kubernetes health check listeners now return a 200 OK status with a JSON response body instead of the previous 204 No Content empty response. This change provides more detailed health information while maintaining backward compatibility for most monitoring setups.

Before:

HTTP/1.1 204 No Content
Content-Type: text/plain

(empty body)

After:

HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 156

{
  "status": "healthy",
  "timestamp": 1717251446,
  "port": 9000,
  "process_id": 12345,
  "errors": {
    "polling_ttl_exceeded": false,
    "consumption_ttl_exceeded": false,
    "unrecoverable": false
  }
}

This change is considered low-risk as most monitoring systems and Kubernetes liveness probes that check for any successful HTTP status (2xx range) will continue working without modification. Most health check integrations rely on the HTTP success status rather than the specific response code or content type, making this transition seamless for most deployments.

However, if your monitoring infrastructure performs strict status code validation expecting exactly 204, you must update it to accept 200 or any 2xx status code. Similarly, any tooling that explicitly expects a text/plain content type will need adjustment to handle the new application/json format. Code that assumes an empty response body should also be updated to handle the JSON payload. However, most health check implementations verify the HTTP status without processing the response content.

The new JSON response format provides enhanced monitoring capabilities by providing granular health information that can improve your observability setup. You can now implement more sophisticated monitoring and alerting based on specific error indicators, debug health check failures with detailed status information, and track process-level metrics, including process ID, port, and timestamp data, directly from the health endpoint.

Admin Configuration Options Update

The admin configuration options have been updated to optimize retryable operations and improve clarity:

Before:

config.admin.max_attempts = 60

After:

config.admin.max_retries_duration = 60_000  # 60 seconds in milliseconds
config.admin.retry_backoff = 500            # 500ms between retries

This change affects how Karafka handles admin operations like topic creation, deletion, and partition management. The new approach uses time-based retries instead of attempt-based ones, providing more predictable behavior and better resource management.

It is a low-risk change, as most users never modify these admin configuration options. They will automatically benefit from the improved retry logic without any code changes.

If you have customized admin retry behavior, please update your configuration to use the new time-based approach. For example, if you previously set max_attempts = 30 with the default 1-second wait time (30 seconds total), you would now set max_retries_duration = 30_000 to maintain equivalent behavior.

The new configuration provides better control over admin operation timeouts and prevents excessive metadata requests to Kafka clusters, which is particularly beneficial when working with topics with hundreds of partitions.

Impact Assessment

  • Low Risk: These changes are purely naming-related and don't affect consumer group names, processing logic, or system stability.
  • Migration: Simple search and replace operations should be sufficient for most codebases.
  1. Search for uses of original_ in your message header access code and replace with source_
  2. Update all Web UI topic configurations to use the .name property if you are using custom Web UI topics names
  3. For Pro users, update the recurring tasks topic configuration structure
  4. Run your test suite to verify everything works as expected

These changes enhance future compatibility without significantly reworking your processing flows or message-handling logic.


Last modified: 2025-06-15 19:40:23