Skip to content

Offset management (checkpointing)

By default, Karafka handles offset commit management for you. The offset is automatically committed:

  • frequently (defaults to once every 5 seconds) - defined by the auto.commit.interval.ms setting on the kafka level.
  • during the shutdown after all processing is done.
  • during the rebalance after all the blocking processing and before the new assignment distribution.

This approach is excellent for most cases and should provide a minimum number of scenarios where reprocessing would happen during normal operations. However, there are some situations where you might need better control over offset management.

Manual offset management

There are several cases in which this API can be helpful:

  • In memory of the DDD sagas realization,
  • Buffering,
  • Simulating transactions.

Configuring Karafka not to mark messages as consumed automatically

To use this API, you need to switch the manual_offset_management setting to true on a per topic basis:

class KarafkaApp < Karafka::App
  routes.draw do
    consumer_group :events do
      # manual_offset_management false

      topic :user_events do
        consumer EventsConsumer
        manual_offset_management true
      end
    end
  end
end

Marking messages as consumed

To mark a certain message as consumed (so in case of a crash or restart, it won't be consumed again), you can use one of two marking methods:

  • #mark_as_consumed - for a non-blocking eventual offset commitment.
  • #mark_as_consumed! - for a blocking offset commitment that will stop the processing flow to ensure that the offset has been stored. This is not recommended for most scenarios, as Karafka will automatically commit the most recent offsets upon rebalance and shutdown.
def consume
  # Do something with messages
  EventStore.store(messages.payloads)
  # And now mark the last message as consumed,
  # so we won't consume any of the already processed messages again
  mark_as_consumed! messages.last
end

Both #mark_as_consumed and #mark_as_consumed! return a boolean value indicating whether your consumer instance still owns the given topic partition. If there was a rebalance and the partition is no longer owned by a consumer, the value returned will be false. You can use this result to stop processing early:

def consume
  messages.each do |message|
    puts "Processing #{message.topic}/#{message.partition} #{message.offset}"

    # Do not process further if we no longer own partition
    return unless mark_as_consumed(message)
  end
end

Example buffer implementation with shutdown DB flush

When manually controlling the moment of marking the message as consumed, it is also worth taking into consideration graceful application termination process.

For some cases, it might be a moment in which, for example, you want to flush the buffer regardless of it not reaching the desired threshold. You can use the #mark_as_consumed also from the #shutdown method:

class EventsConsumer < ApplicationConsumer
  # Flush to DB only in 1k batches
  FLUSH_SIZE = 1000

  def consume
    # Unparse and add to buffer
    messages.each { |message| buffer << message }

    # If buffer exceeds the FLUSH_SIZE, it's time to put data into the DB
    if buffer.size >= FLUSH_SIZE
      data = buffer.shift(FLUSH_SIZE)
      p "importing: #{data.count}"
      # Once importing is done, we can mark last message from the imported set
      # as consumed
      mark_as_consumed!(data.last)
    end
  end

  # Before we stop, if there is anything in the buffer, let's import it despite
  # the fact, that it didn't reach the FLUSH_SIZE
  def shutdown
    unless buffer.empty?
      p "importing: #{buffer.count}"
      # Mark last message as consumed, as they are all in the DB
      mark_as_consumed!(buffer.last)
    end
  end

  def buffer
    @buffer ||= []
  end
end

Mixing manual and automatic offset management

You can still take advantage of this API even when using automatic offset management. For example, you may want to commit the offset manually after a certain number of messages are consumed from a batch:

class CountersConsumer < ApplicationConsumer
  def consume
    messages.each_with_index do |message, index|
      # Some business logic here
      # Commit every 10 messages processed
      mark_as_consumed(message) if index % 10 == 0
    end
  end
end