Skip to content

Administration (topics management and cluster info)

Karafka has a few simple admin functions that allows for getting cluster info and creating and deleting topics.

Note: Admin actions will always be applied to the default cluster defined in the configuration.

Creating a topic

topic_name = 'my_cool_topic'
partitions_count = 2
replication_factor = 1 # 1 for dev, for prod you want more

Karafka::Admin.create_topic(topic_name, partitions_count, replication_factor)

Deleting a topic

topic_name = 'my_cool_topic'

Karafka::Admin.delete_topic(topic_name)

Getting cluster-info

# Get cluster info and list all the topics
info = Karafka::Admin.cluster_info

puts info.topics.map { |topic| topic[:topic_name] }.join(', ')

Reading topic messages

By using the read_topic method, you can read data from a given topic partition without subscribing to it.

Note: While the returned messages are Karafka::Messages::Message objects, they may not hold the correct notion of the topic details unless the given topic is defined in Karafka routes. For topics that are not defined, defaults will be used.

Getting last N messages

topic = 'my_topic'
partition = 0
how_many = 10

messages = Karafka::Admin.read_topic(topic, partition, how_many)

messages.each do |message|
  puts message.raw_payload
end

Getting messages from a given offset

topic = 'my_topic'
partition = 0
how_many = 10
first_offset = 50

messages = Karafka::Admin.read_topic(topic, partition, how_many, first_offset)

messages.each do |message|
  puts message.raw_payload
end