# Upgrading to Karafka 2.6 !!! warning "Not Yet Released" Karafka 2.6 has **not** been released yet. This document is a work in progress and reflects the planned changes. Details may change before the final release. Karafka 2.6 is a significant release built around one central theme: preparing the internals for **Kafka Share Groups** (KIP-932). To support a fundamentally new group type alongside the existing consumer group model, a large portion of the processing, routing, connection, and instrumentation layers has been reorganized into consistent namespaces. This was the mandatory first step - the internal consistency work had to land before any Share Group functionality could be layered on top. Beyond the structural groundwork, 2.6 ships a redesigned Declarative Topics system, a new low-level partition offset query API, and dynamic worker pool scaling. !!! tip "Pro & Enterprise Upgrade Support" If you're gearing up to upgrade to the latest Karafka version and are a Pro or Enterprise user, remember you've got a dedicated lifeline! Reach out via the dedicated Slack channel for direct support to ensure everything has been covered. As always, please make sure you have upgraded to the most recent version of `2.5` before upgrading to `2.6`. Also, remember to read and apply our standard [upgrade procedures](https://karafka.io/docs/Upgrades-Upgrading.md). ## Upgrading to Karafka 2.6 / Breaking Changes ### Upgrading to Karafka 2.6 / Breaking Changes / Pause Configuration Flat Methods Removed The flat pause configuration methods (`config.pause_timeout`, `config.pause_max_timeout`, `config.pause_with_exponential_backoff`) that were deprecated in Karafka 2.5.2 have been removed. Use the nested `config.pause.*` namespace instead: ```ruby # Before (no longer works) config.pause_timeout = 1_000 config.pause_max_timeout = 30_000 config.pause_with_exponential_backoff = true # After config.pause.timeout = 1_000 config.pause.max_timeout = 30_000 config.pause.with_exponential_backoff = true ``` ## Upgrading to Karafka 2.6 / New Declarative Topics API The Declarative Topics system has been redesigned with a standalone `declaratives.draw` DSL that lives independently of routing. This change resolves a fundamental mismatch that grew more visible as Karafka adoption spread: many teams use Karafka as the single source of truth for their entire Kafka topic infrastructure, not just the topics their application consumes. Under the old model, topic declarations were embedded inside routing blocks. This meant that to declare and manage a topic - whether for another team's service, a shared audit log, or a produce-only sink - you had to add it to your routing, which implies consumption intent and brings all the routing validation and consumer machinery along with it. The two concerns leaked into each other in a way that could not scale cleanly. Separating declarations from routing makes the purpose of each explicit: routing describes what your application consumes and how, while declaratives describe what Kafka topics exist and how they are configured. The two are now related only when you choose them to be - by having a routing entry for a topic you also declare. All existing CLI commands (`migrate`, `plan`, `create`, `delete`, `align`, `repartition`) now read from the shared declaratives repository regardless of how a topic was registered. ### Upgrading to Karafka 2.6 / New Declarative Topics API / Standalone DSL Topic declarations are defined using `declaratives.draw` at the application level: ```ruby class KarafkaApp < Karafka::App declaratives.draw do topic :orders do partitions 6 replication_factor 3 config( 'retention.ms': 86_400_000, 'cleanup.policy': 'delete' ) end topic :events do partitions 10 replication_factor 3 end end routes.draw do topic :orders do consumer OrdersConsumer end end end ``` If not specified, the defaults are `partitions: 1` and `replication_factor: 1`. ### Upgrading to Karafka 2.6 / New Declarative Topics API / Defaults Support You can define default values that apply to all topics in the block. Topic-specific values override them: ```ruby class KarafkaApp < Karafka::App declaratives.draw do defaults do partitions 5 replication_factor 3 config('retention.ms': 604_800_000) end topic :orders do partitions 10 # Overrides the default of 5 end topic :events # Inherits: 5 partitions, 3 replication_factor, 7-day retention end end ``` ### Upgrading to Karafka 2.6 / New Declarative Topics API / Multiple Draw Blocks You can call `declaratives.draw` more than once. Each call is additive and accumulates topic declarations. This is useful for splitting declarations across initializers or plugins: ```ruby class KarafkaApp < Karafka::App declaratives.draw do topic :orders do partitions 6 end end declaratives.draw do topic :events do partitions 10 end end end ``` ### Upgrading to Karafka 2.6 / New Declarative Topics API / Managing Topics You Do Not Consume Because declarations are decoupled from routing, you can manage topics owned by other services or topics used only for producing: ```ruby class KarafkaApp < Karafka::App declaratives.draw do topic :orders do partitions 6 replication_factor 3 end # Owned by a different service - managed here for infrastructure consistency topic :external_events do partitions 10 replication_factor 3 config('retention.ms': 604_800_000) end # Produce-only topic, no routing entry needed topic :audit_log do partitions 3 replication_factor 3 config('cleanup.policy': 'compact') end end end ``` ### Upgrading to Karafka 2.6 / New Declarative Topics API / Legacy Routing-Based Configuration is Deprecated !!! warning "Deprecation Notice" Defining topic configuration via the routing `#config` method is deprecated. It continues to work in 2.6 for backwards compatibility, but will be removed in a future major release. All new topic declarations should use the standalone `declaratives.draw` DSL. The routing-based `config()` approach still functions and populates the same shared repository. When a topic is declared in both places, the standalone `declaratives.draw` declaration takes precedence. To migrate, move your `config()` calls from routing into a `declaratives.draw` block: ```ruby # Before (deprecated, still works) routes.draw do topic :orders do config(partitions: 6, replication_factor: 3, 'retention.ms': 86_400_000) consumer OrdersConsumer end end # After (recommended) declaratives.draw do topic :orders do partitions 6 replication_factor 3 config('retention.ms': 86_400_000) end end routes.draw do topic :orders do consumer OrdersConsumer end end ``` ## Upgrading to Karafka 2.6 / Internal Namespace Reorganization The largest set of changes in 2.6 is the reorganization of internal classes under `ConsumerGroups` namespaces across the processing, routing, connection, and instrumentation layers. This is **step one** of the work required to bring Kafka Share Groups (KIP-932) into Karafka - Share Groups need their own parallel set of strategies, coordinators, jobs, and callbacks, and that is only cleanly achievable once the existing consumer-group-specific code lives in a dedicated namespace. !!! info "No Impact on Documented Public APIs" All documented public APIs - consumer classes, routing DSL, configuration options, instrumentation event names and payloads - are unchanged. If you only use what is covered in the official documentation, you will **not** be affected by any of the moves below. !!! warning "Advanced Users: Internal Class References" If your code references internal Karafka classes directly by constant path (for example, to override a strategy, swap a coordinator, or hook into a jobs builder), those constants have moved. The changes are mechanical renames with no behavioral difference. Review the list below and update any direct references. The moved constants include (OSS): - `Karafka::Processing::Coordinator` → `Karafka::Processing::ConsumerGroups::Coordinator` - `Karafka::Processing::CoordinatorsBuffer` → `Karafka::Processing::ConsumerGroups::CoordinatorsBuffer` - `Karafka::Processing::Executor` → `Karafka::Processing::ConsumerGroups::Executor` - `Karafka::Processing::ExecutorsBuffer` → `Karafka::Processing::ConsumerGroups::ExecutorsBuffer` - `Karafka::Processing::Partitioner` → `Karafka::Processing::ConsumerGroups::Partitioner` - `Karafka::Processing::ExpansionsSelector` → `Karafka::Processing::ConsumerGroups::ExpansionsSelector` - `Karafka::Processing::Strategies::*` → `Karafka::Processing::ConsumerGroups::Strategies::*` - `Karafka::Processing::StrategySelector` → `Karafka::Processing::ConsumerGroups::StrategySelector` - `Karafka::Processing::Jobs::Consume` → `Karafka::Processing::ConsumerGroups::Jobs::Consume` - `Karafka::Processing::Jobs::Eofed` → `Karafka::Processing::ConsumerGroups::Jobs::Eofed` - `Karafka::Processing::Jobs::Revoked` → `Karafka::Processing::ConsumerGroups::Jobs::Revoked` - `Karafka::Processing::Jobs::Shutdown` → `Karafka::Processing::ConsumerGroups::Jobs::Shutdown` - `Karafka::Processing::Jobs::Idle` → `Karafka::Processing::ConsumerGroups::Jobs::Idle` - `Karafka::Processing::JobsBuilder` → `Karafka::Processing::ConsumerGroups::JobsBuilder` - `Karafka::Connection::RebalanceManager` → `Karafka::Connection::ConsumerGroups::RebalanceManager` - `Karafka::Instrumentation::Callbacks::Rebalance` → `Karafka::Instrumentation::Callbacks::ConsumerGroups::Rebalance` - `Karafka::Instrumentation::Callbacks::Error` → `Karafka::Instrumentation::Callbacks::ConsumerGroups::Error` - `Karafka::Instrumentation::Callbacks::Statistics` → `Karafka::Instrumentation::Callbacks::ConsumerGroups::Statistics` - `Karafka::Routing::Features::ActiveJob` → `Karafka::Routing::Features::ConsumerGroups::ActiveJob` - `Karafka::Routing::Features::DeadLetterQueue` → `Karafka::Routing::Features::ConsumerGroups::DeadLetterQueue` - `Karafka::Routing::Features::Eofed` → `Karafka::Routing::Features::ConsumerGroups::Eofed` - `Karafka::Routing::Features::ManualOffsetManagement` → `Karafka::Routing::Features::ConsumerGroups::ManualOffsetManagement` The following config internal settings have been nested under `config.internal.processing.consumer_groups`: - `config.internal.processing.coordinator_class` - `config.internal.processing.executor_class` - `config.internal.processing.partitioner_class` - `config.internal.processing.strategy_selector` - `config.internal.processing.expansions_selector` - `config.internal.processing.errors_tracker_class` - `config.internal.processing.jobs_builder` Shared settings (`jobs_queue_class`, `scheduler_class`, `worker_job_call_wrapper`) remain at the `config.internal.processing` level and are unaffected. ### Upgrading to Karafka 2.6 / Internal Namespace Reorganization / Routing Group Accessor Changes `Routing::Topic#group` and `Routing::SubscriptionGroup#group` have been introduced as polymorphic accessors. `#consumer_group` is kept as a backwards-compatible alias and will be retired in Karafka 3.0 once Share Groups land. Instrumentation payloads now emit parallel `group:` / `group_id:` keys alongside the existing `consumer_group:` / `consumer_group_id:` keys - the legacy keys remain present and unchanged. ## Upgrading to Karafka 2.6 / New Admin API: Reading Partition Offsets `Karafka::Admin.read_partition_offsets` is a new low-level method for querying partition offsets without a consumer group. It supports `:earliest`, `:latest`, `:max_timestamp`, and millisecond timestamp specs across multiple topics and partitions in a single call. Pass `isolation_level: Karafka::Admin::IsolationLevels::READ_COMMITTED` to get the Last Stable Offset (LSO) instead of the high-watermark for accurate lag computation on transactionally-produced topics. See the [Admin API](https://karafka.io/docs/Infrastructure-Admin-API.md) for full usage details. ## Upgrading to Karafka 2.6 / Dynamic Worker Pool Scaling The worker thread pool can now be scaled at runtime without restarting the process. This is useful for adapting concurrency to time-of-day load patterns, responding to external signals, or gradually ramping up after a deployment. The pool is accessible via `Karafka::Server.workers` and exposes a single `#scale(target)` method: ```ruby # Increase to 10 workers (adds threads immediately) Karafka::Server.workers.scale(10) # Reduce to 3 workers (workers exit gracefully as they finish current jobs) Karafka::Server.workers.scale(3) # Check the current pool size Karafka::Server.workers.size ``` Scaling up is synchronous - new threads are started and registered before `#scale` returns, and `#size` reflects the new count immediately. Scaling down is asynchronous - nil sentinels are enqueued and workers exit when they pick one up, so `#size` decreases gradually as workers finish their current job and deregister. The pool enforces a minimum of 1 worker regardless of the target passed. Both directions emit instrumentation events you can subscribe to: ```ruby Karafka.monitor.subscribe('worker.scaling.up') do |event| pool = event[:workers_pool] puts "Workers scaled up from #{event[:from]} to #{event[:to]}" end Karafka.monitor.subscribe('worker.scaling.down') do |event| pool = event[:workers_pool] puts "Workers scaling down from #{event[:from]} to #{event[:to]} (draining)" end ``` A practical pattern is to drive scaling from an external source such as a signal handler, a scheduled check, or a metrics threshold: ```ruby # Scale workers based on time-of-day in an initializer or a recurring task Thread.new do loop do target = Time.now.hour.between?(8, 20) ? 10 : 3 Karafka::Server.workers.scale(target) sleep 60 end end ``` !!! note "Relationship with `config.concurrency`" `config.concurrency` sets the initial pool size at boot. `#scale` adjusts the running pool at runtime without affecting the configuration value. After a restart, the pool will initialize again from `config.concurrency`. ## Upgrading to Karafka 2.6 / Ractors Deferred from 2.6 Ractor-based parallel deserialization was implemented and is functional, but has been intentionally excluded from this release. The reasoning is straightforward: 2.6 already contains a large volume of internal structural changes. Shipping Ractors on top of that would have made it significantly harder to reason about the source of any issues that surface after the release. Ractors will be introduced in a subsequent release once the 2.6 internals stabilize in production. ## Upgrading to Karafka 2.6 / Performance Improvements Several internal admin operations that previously issued N sequential per-partition consumer calls have been replaced with batched admin calls: - `Admin::Topics#read_watermark_offsets` now issues two batch calls (`:earliest` and `:latest`) regardless of how many topics or partitions are queried, instead of N sequential calls. - The time-based offset resolution fallback in `Admin::ConsumerGroups#seek` now uses a single batch call. - Pro `Iterator::TplBuilder` negative offset resolution is now handled with three total calls regardless of partition count. ## Upgrading to Karafka 2.6 / Summary of Actions Required For most applications, the upgrade from 2.5 to 2.6 requires only one action: - Update `config.pause_timeout`, `config.pause_max_timeout`, and `config.pause_with_exponential_backoff` to `config.pause.timeout`, `config.pause.max_timeout`, and `config.pause.with_exponential_backoff`. If your code references internal Karafka classes by constant path, review the namespace moves in the Internal Namespace Reorganization section above. Migrating from routing-based `config()` to `declaratives.draw` is encouraged but **not** required for 2.6 - the old API continues to work. --- *Last modified: 2026-05-31 17:27:42*