KIP-932 for rdkafka-ruby¶
Work in Progress
This document is a work in progress used for development purposes. It is subject to constant changes as it tracks research and development work related to KIP-932. It should not be used for any assumptions about future APIs, features, or implementation details.
KIP-932 introduces Share Groups, a new consumption model for Kafka that enables queue-like semantics. Unlike traditional consumer groups where each partition is assigned to exactly one consumer, share groups allow multiple consumers to cooperatively consume from the same partitions without exclusive assignment.
This document analyzes the changes required in rdkafka-ruby to support KIP-932.
Critical Dependency: librdkafka¶
rdkafka-ruby is an FFI wrapper around librdkafka. All KIP-932 functionality depends on librdkafka implementing share group support first.
| Component | Status | Notes |
|---|---|---|
| librdkafka KIP-932 | Not started | No public issues or PRs as of research date |
| rdkafka-ruby bindings | Blocked | Waiting for librdkafka implementation |
Once librdkafka implements KIP-932, rdkafka-ruby will need to expose the new APIs through FFI bindings.
KIP-932 Overview¶
What Are Share Groups?¶
Share groups represent a new consumption model where:
| Aspect | Consumer Groups (Current) | Share Groups (KIP-932) |
|---|---|---|
| Partition Access | Exclusive per consumer | Shared/cooperative |
| Scaling | Limited by partition count | Can exceed partition count |
| Record Handling | Offset-based commits | Per-record acknowledgment |
| Delivery Tracking | Implicit via offsets | Explicit delivery counts |
| Rebalancing | Client or server-side | Server-side only (SimpleAssignor) |
| Group States | Multiple (PREPARING_REBALANCE, etc.) | Simple (EMPTY, STABLE, DEAD) |
Key Concepts¶
- At-least-once delivery: Records can be redelivered if not acknowledged
- Per-record acknowledgment: ACCEPT, RELEASE, or REJECT each record
- Acquisition locks: Records are locked for processing (default 30s)
- Delivery attempt tracking: Max 5 attempts by default, then archived
- Record states: Available → Acquired → Acknowledged/Archived
New Protocol Components¶
ShareFetch/ShareAcknowledgeRPCsShareGroupHeartbeatfor membership management- Internal topic:
__share_group_state - Offsets: SPSO (Share-Partition Start Offset), SPEO (Share-Partition End Offset)
Current rdkafka-ruby Architecture¶
Consumer Structure¶
rdkafka-ruby uses Ruby FFI to bind to librdkafka's C API:
Rdkafka::Config
├── consumer() → Rdkafka::Consumer
├── producer() → Rdkafka::Producer
└── admin() → Rdkafka::Admin
Rdkafka::Consumer
├── subscribe(topics)
├── poll(timeout)
├── each { |message| ... }
├── commit(message, async)
├── store_offset(message)
├── pause/resume(topic_partition_list)
├── seek(topic_partition)
└── close()
Statistics Callback¶
rdkafka-ruby exposes librdkafka statistics via a callback mechanism:
config = Rdkafka::Config.new(
'bootstrap.servers': 'localhost:9092',
'statistics.interval.ms': 5000
)
config.consumer_statistics_callback = ->(stats) {
# stats is a Hash parsed from librdkafka's JSON statistics
# Contains: cgrp (consumer group), topics, brokers, etc.
}
The cgrp (consumer group) section currently includes:
state- Consumer group statestateage- Time in state (microseconds)join_state- Join state for rebalancingrebalance_age- Time since last rebalancerebalance_cnt- Rebalance countrebalance_reason- Last rebalance reason
Error Handling¶
Errors are exposed via Rdkafka::RdkafkaError:
begin
consumer.commit(message)
rescue Rdkafka::RdkafkaError => e
case e.code
when :unknown_member_id
# Handle error
end
end
Required librdkafka APIs¶
For rdkafka-ruby to support KIP-932, librdkafka must expose APIs for:
- Share group subscription - Subscribe to topics as a share group
- Acknowledgment - ACCEPT, RELEASE, or REJECT individual records
- Batch acknowledgment - Acknowledge multiple records at once
- Commit - Commit pending acknowledgments (for explicit mode)
- Delivery attempt - Get the delivery attempt count for a message
- Statistics - Share group metrics in the statistics JSON
The exact API signatures will depend on librdkafka's implementation decisions.
API Design: Extended Consumer Class¶
Following librdkafka's approach, rdkafka-ruby will extend the existing Consumer class rather than creating a separate class. The group type is determined by configuration.
New Consumer Methods¶
module Rdkafka
class Consumer
# Existing methods remain unchanged...
# NEW: Share group acknowledgment
# @param message [Message] The message to acknowledge
# @param type [Symbol] :accept, :release, or :reject
def acknowledge(message, type = :accept)
# FFI call to librdkafka acknowledge function
end
# NEW: Batch acknowledgment
# @param messages [Array<Message>] Messages to acknowledge
# @param type [Symbol] :accept, :release, or :reject
def acknowledge_batch(messages, type = :accept)
# FFI call to librdkafka batch acknowledge function
end
# NEW: Check if this consumer is using a share group
def share_group?
# Check group.type config
end
end
end
New Constants¶
module Rdkafka
# Share group acknowledgment types
module ShareAckType
ACCEPT = :accept
RELEASE = :release
REJECT = :reject
end
# Share group states (for statistics parsing)
module ShareGroupState
EMPTY = 'empty'
STABLE = 'stable'
DEAD = 'dead'
end
end
Message Extensions¶
module Rdkafka
class Message
# NEW: Delivery attempt count (share groups)
# Returns 1 for first delivery, increments on redelivery
def delivery_attempt
# FFI call to get delivery attempt from native message
end
# NEW: Check if this is a redelivered message
def redelivered?
delivery_attempt > 1
end
end
end
Method Availability¶
When using a share group consumer, certain methods don't apply:
| Method | Consumer Group | Share Group |
|---|---|---|
subscribe |
Yes | Yes |
poll |
Yes | Yes |
each |
Yes | Yes |
commit |
Yes | Yes (commits acknowledgments) |
acknowledge |
No | Yes |
acknowledge_batch |
No | Yes |
store_offset |
Yes | No (raises error) |
seek |
Yes | No (raises error) |
pause/resume |
Yes | No (raises error) |
Methods that don't apply to share groups will raise a descriptive error when called.
Statistics Exposure¶
Share Group Statistics Structure¶
When librdkafka exposes share group statistics, they will likely appear in a separate section (e.g., sgrp) from consumer group stats (cgrp):
config.consumer_statistics_callback = ->(stats) {
if stats['sgrp'] # Share group stats
sg = stats['sgrp']
puts "Share group state: #{sg['state']}"
puts "Member ID: #{sg['member_id']}"
sg['topics']&.each do |topic, topic_stats|
topic_stats['partitions']&.each do |partition_id, p_stats|
puts "#{topic}[#{partition_id}]:"
puts " SPSO: #{p_stats['spso']}"
puts " SPEO: #{p_stats['speo']}"
puts " In-flight: #{p_stats['in_flight']}"
puts " Acquired: #{p_stats['acquired']}"
end
end
end
}
Statistics Schema Differences¶
| Metric | Consumer Group | Share Group |
|---|---|---|
| Group state | cgrp.state (up, etc.) |
sgrp.state (empty/stable/dead) |
| Join state | cgrp.join_state |
N/A (no client-side joining) |
| Rebalance | cgrp.rebalance_* |
N/A (server-side assignment) |
| Partition lag | cgrp.topics.*.partitions.*.lag |
N/A (different semantics) |
| SPSO/SPEO | N/A | sgrp.topics.*.partitions.*.spso/speo |
| In-flight | N/A | sgrp.topics.*.partitions.*.in_flight |
| Acquired | N/A | sgrp.topics.*.partitions.*.acquired |
Error Handling¶
New Error Codes¶
KIP-932 introduces new error codes that librdkafka will expose:
| Error Code | Description | Handling |
|---|---|---|
SHARE_SESSION_NOT_FOUND |
Share session expired/invalid | Re-establish session |
SHARE_PARTITION_NOT_FOUND |
Partition not available | Retry or resubscribe |
INVALID_SHARE_SESSION_EPOCH |
Stale session epoch | Re-poll to get new epoch |
FENCED_STATE_EPOCH |
State epoch mismatch | Internal retry |
INVALID_RECORD_STATE |
Invalid state transition | Check acknowledgment logic |
Error Class Extensions¶
module Rdkafka
class RdkafkaError < StandardError
# Existing error codes...
# NEW: Helper to check if error is share group related
def share_group_error?
# Check against share group error codes
end
end
end
Configuration Options¶
Share Group Specific Settings¶
These configuration options will be passed through to librdkafka:
| Config Key | Default | Description |
|---|---|---|
group.type |
consumer |
Group type: consumer or share |
group.share.record.lock.duration.ms |
30000 |
Acquisition lock timeout |
group.share.heartbeat.interval.ms |
3000 |
Heartbeat frequency |
group.share.session.timeout.ms |
45000 |
Session timeout |
group.share.min.record.lock.duration.ms |
15000 |
Minimum lock duration |
group.share.max.record.lock.duration.ms |
60000 |
Maximum lock duration |
Configuration Example¶
config = Rdkafka::Config.new(
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-share-group',
'group.type': 'share',
'group.share.record.lock.duration.ms': 30000,
'group.share.heartbeat.interval.ms': 3000,
'group.share.session.timeout.ms': 45000
)
consumer = config.consumer
Acknowledgment Mode¶
KIP-932 defines two acknowledgment modes:
- Implicit: Records acknowledged via
poll()orcommit() - Explicit: Per-record acknowledgment via
acknowledge()
# Implicit mode (auto-acknowledge on next poll)
config = Rdkafka::Config.new(
# ...
'share.acknowledgement.mode': 'implicit'
)
# Explicit mode (must call acknowledge)
config = Rdkafka::Config.new(
# ...
'share.acknowledgement.mode': 'explicit'
)
Usage Example¶
# Create share group consumer
config = Rdkafka::Config.new(
'bootstrap.servers': 'localhost:9092',
'group.id': 'job-processors',
'group.type': 'share',
'share.acknowledgement.mode': 'explicit'
)
consumer = config.consumer
consumer.subscribe(['jobs'])
# Process messages
consumer.each do |message|
puts "Processing job (attempt #{message.delivery_attempt})"
begin
process_job(message.payload)
consumer.acknowledge(message, :accept)
rescue TransientError
# Release for immediate redelivery to any consumer
consumer.acknowledge(message, :release)
rescue PermanentError
# Reject - won't be redelivered (archived after max attempts)
consumer.acknowledge(message, :reject)
end
end
Open Questions¶
librdkafka Design Dependencies¶
- What will the statistics JSON structure look like for share groups?
- Will there be new callbacks for share group events (member joined, etc.)?
- How will librdkafka handle implicit vs explicit acknowledgment mode?
rdkafka-ruby API Decisions¶
- How should batch acknowledgment handle partial failures?
- Should statistics parsing be unified or separate for consumer/share groups?
Compatibility¶
- Minimum librdkafka version required for KIP-932?
- Kafka broker version requirements (4.0+)?
- Will there be feature detection for share group support?
Last modified: 2025-12-17 17:51:02