Class: Karafka::Admin::Topics

Inherits:
Karafka::Admin show all
Defined in:
lib/karafka/admin/topics.rb

Overview

Topic administration operations Provides methods to manage Kafka topics including creation, deletion, reading, and introspection

Class Method Summary collapse

Methods inherited from Karafka::Admin

cluster_info, copy_consumer_group, create_topic, delete_consumer_group, delete_topic, read_lags_with_offsets, read_topic, rename_consumer_group, seek_consumer_group, topic_info, trigger_rebalance, with_admin, with_consumer

Class Method Details

.create(name, partitions, replication_factor, topic_config = {}) ⇒ void

This method returns an undefined value.

Creates Kafka topic with given settings

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions we expect

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details as described here: https://kafka.apache.org/documentation/#topicconfigs



98
99
100
101
102
103
104
105
106
107
# File 'lib/karafka/admin/topics.rb', line 98

def create(name, partitions, replication_factor, topic_config = {})
  with_admin do |admin|
    handler = admin.create_topic(name, partitions, replication_factor, topic_config)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: max_wait_time_seconds) },
      -> { names.include?(name) }
    )
  end
end

.create_partitions(name, partitions) ⇒ void

This method returns an undefined value.

Creates more partitions for a given topic

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with



131
132
133
134
135
136
137
138
139
140
# File 'lib/karafka/admin/topics.rb', line 131

def create_partitions(name, partitions)
  with_admin do |admin|
    handler = admin.create_partitions(name, partitions)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: max_wait_time_seconds) },
      -> { info(name).fetch(:partition_count) >= partitions }
    )
  end
end

.delete(name) ⇒ void

This method returns an undefined value.

Deleted a given topic

Parameters:

  • name (String)

    topic name



114
115
116
117
118
119
120
121
122
123
# File 'lib/karafka/admin/topics.rb', line 114

def delete(name)
  with_admin do |admin|
    handler = admin.delete_topic(name)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: max_wait_time_seconds) },
      -> { !names.include?(name) }
    )
  end
end

.info(topic_name) ⇒ Hash

Note:

This query is much more efficient than doing a full #cluster_info + topic lookup because it does not have to query for all the topics data but just the topic we’re interested in

Returns basic topic metadata

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

Returns:

  • (Hash)

    topic metadata info hash

Raises:

  • (Rdkafka::RdkafkaError)

    unknown_topic_or_part if requested topic is not found



196
197
198
199
200
201
202
203
# File 'lib/karafka/admin/topics.rb', line 196

def info(topic_name)
  with_admin do |admin|
    admin
      .(topic_name)
      .topics
      .find { |topic| topic[:topic_name] == topic_name }
  end
end

.read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>

Allows us to read messages from the topic

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start. If -1 is provided (default) we will start from the latest offset. If time is provided, the appropriate offset will be resolved. If negative beyond -1 is provided, we move backwards more.

  • settings (Hash) (defaults to: {})

    kafka extra settings (optional)

Returns:



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/karafka/admin/topics.rb', line 21

def read(name, partition, count, start_offset = -1, settings = {})
  messages = []
  tpl = Rdkafka::Consumer::TopicPartitionList.new
  low_offset, high_offset = nil

  with_consumer(settings) do |consumer|
    # Convert the time offset (if needed)
    start_offset = resolve_offset(consumer, name.to_s, partition, start_offset)

    low_offset, high_offset = consumer.query_watermark_offsets(name, partition)

    # Select offset dynamically if -1 or less and move backwards with the negative
    # offset, allowing to start from N messages back from high-watermark
    start_offset = high_offset - count - start_offset.abs + 1 if start_offset.negative?
    start_offset = low_offset if start_offset.negative?

    # Build the requested range - since first element is on the start offset we need to
    # subtract one from requested count to end up with expected number of elements
    requested_range = (start_offset..(start_offset + count - 1))
    # Establish theoretical available range. Note, that this does not handle cases related
    # to log retention or compaction
    available_range = (low_offset..(high_offset - 1))
    # Select only offset that we can select. This will remove all the potential offsets
    # that are below the low watermark offset
    possible_range = requested_range.select { |offset| available_range.include?(offset) }

    start_offset = possible_range.first
    count = possible_range.size

    tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset)
    consumer.assign(tpl)

    # We should poll as long as we don't have all the messages that we need or as long as
    # we do not read all the messages from the topic
    loop do
      # If we've got as many messages as we've wanted stop
      break if messages.size >= count

      message = consumer.poll(200)

      next unless message

      # If the message we've got is beyond the requested range, stop
      break unless possible_range.include?(message.offset)

      messages << message
    rescue Rdkafka::RdkafkaError => e
      # End of partition
      break if e.code == :partition_eof

      raise e
    end
  end

  # Use topic from routes if we can match it or create a dummy one
  # Dummy one is used in case we cannot match the topic with routes. This can happen
  # when admin API is used to read topics that are not part of the routing
  topic = Karafka::Routing::Router.find_or_initialize_by_name(name)

  messages.map! do |message|
    Messages::Builders::Message.call(
      message,
      topic,
      Time.now
    )
  end
end

.read_watermark_offsets(name_or_hash, partition = nil) ⇒ Array<Integer, Integer>, Hash

Fetches the watermark offsets for a given topic partition or multiple topics and partitions

Examples:

Query single partition

Karafka::Admin::Topics.read_watermark_offsets('events', 0)
# => [0, 100]

Query specific partitions across multiple topics

Karafka::Admin::Topics.read_watermark_offsets(
  { 'events' => [0, 1], 'logs' => [0] }
)
# => {
#   'events' => {
#     0 => [0, 100],
#     1 => [0, 150]
#   },
#   'logs' => {
#     0 => [0, 50]
#   }
# }

Parameters:

  • name_or_hash (String, Symbol, Hash)

    topic name or hash with topics and partitions

  • partition (Integer, nil) (defaults to: nil)

    partition number (required when first param is topic name)

Returns:

  • (Array<Integer, Integer>, Hash)

    when querying single partition returns array with low and high watermark offsets, when querying multiple returns nested hash



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/karafka/admin/topics.rb', line 169

def read_watermark_offsets(name_or_hash, partition = nil)
  # Normalize input to hash format
  topics_with_partitions = partition ? { name_or_hash => [partition] } : name_or_hash

  result = Hash.new { |h, k| h[k] = {} }

  with_consumer do |consumer|
    topics_with_partitions.each do |topic, partitions|
      partitions.each do |partition_id|
        result[topic][partition_id] = consumer.query_watermark_offsets(topic, partition_id)
      end
    end
  end

  # Return single array for single partition query, hash for multiple
  partition ? result.dig(name_or_hash, partition) : result
end