Module: Karafka::Admin

Defined in:
lib/karafka/admin.rb,
lib/karafka/admin/acl.rb,
lib/karafka/admin/configs.rb,
lib/karafka/admin/configs/config.rb,
lib/karafka/admin/configs/resource.rb

Overview

Note:

It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.

Note:

It always uses the primary defined cluster and does not support multi-cluster work. Cluster on which operations are performed can be changed via admin.kafka config, however there is no multi-cluster runtime support.

Admin actions that we can perform via Karafka on our Kafka cluster

Defined Under Namespace

Modules: Configs Classes: Acl

Class Method Summary collapse

Class Method Details

.cluster_infoRdkafka::Metadata

Returns cluster metadata info.

Returns:

  • (Rdkafka::Metadata)

    cluster metadata info



454
455
456
# File 'lib/karafka/admin.rb', line 454

def cluster_info
  with_admin(&:metadata)
end

.create_partitions(name, partitions) ⇒ Object

Creates more partitions for a given topic

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with



135
136
137
138
139
140
141
142
143
144
# File 'lib/karafka/admin.rb', line 135

def create_partitions(name, partitions)
  with_admin do |admin|
    handler = admin.create_partitions(name, partitions)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) },
      -> { topic_info(name).fetch(:partition_count) >= partitions }
    )
  end
end

.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object

Creates Kafka topic with given settings

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions we expect

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details as described here: kafka.apache.org/documentation/#topicconfigs



106
107
108
109
110
111
112
113
114
115
# File 'lib/karafka/admin.rb', line 106

def create_topic(name, partitions, replication_factor, topic_config = {})
  with_admin do |admin|
    handler = admin.create_topic(name, partitions, replication_factor, topic_config)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) },
      -> { topics_names.include?(name) }
    )
  end
end

.delete_consumer_group(consumer_group_id) ⇒ Object

Note:

This method should not be used on a running consumer group as it will not yield any results.

Removes given consumer group (if exists)

Parameters:

  • consumer_group_id (String)

    consumer group name



325
326
327
328
329
330
# File 'lib/karafka/admin.rb', line 325

def delete_consumer_group(consumer_group_id)
  with_admin do |admin|
    handler = admin.delete_group(consumer_group_id)
    handler.wait(max_wait_timeout: app_config.admin.max_wait_time)
  end
end

.delete_topic(name) ⇒ Object

Deleted a given topic

Parameters:

  • name (String)

    topic name



120
121
122
123
124
125
126
127
128
129
# File 'lib/karafka/admin.rb', line 120

def delete_topic(name)
  with_admin do |admin|
    handler = admin.delete_topic(name)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) },
      -> { !topics_names.include?(name) }
    )
  end
end

.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash<String, Hash<Integer, <Hash<Integer>>>>

Note:

For topics that do not exist, topic details will be set to an empty hash

Note:

For topics that exist but were never consumed by a given CG we set -1 as lag and the offset on each of the partitions that were not consumed.

Note:

This lag reporting is for committed lags and is “Kafka-centric”, meaning that this represents lags from Kafka perspective and not the consumer. They may differ.

Reads lags and offsets for given topics in the context of consumer groups defined in the routing

Parameters:

  • consumer_groups_with_topics (Hash<String, Array<String>>) (defaults to: {})

    hash with consumer groups names with array of topics to query per consumer group inside

  • active_topics_only (Boolean) (defaults to: true)

    if set to false, when we use routing topics, will select also topics that are marked as inactive in routing

Returns:

  • (Hash<String, Hash<Integer, <Hash<Integer>>>>)

    hash where the top level keys are the consumer groups and values are hashes with topics and inside partitions with lags and offsets



360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# File 'lib/karafka/admin.rb', line 360

def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true)
  # We first fetch all the topics with partitions count that exist in the cluster so we
  # do not query for topics that do not exist and so we can get partitions count for all
  # the topics we may need. The non-existent and not consumed will be filled at the end
  existing_topics = cluster_info.topics.map do |topic|
    [topic[:topic_name], topic[:partition_count]]
  end.to_h.freeze

  # If no expected CGs, we use all from routing that have active topics
  if consumer_groups_with_topics.empty?
    consumer_groups_with_topics = Karafka::App.routes.map do |cg|
      cg_topics = cg.topics.select do |cg_topic|
        active_topics_only ? cg_topic.active? : true
      end

      [cg.id, cg_topics.map(&:name)]
    end.to_h
  end

  # We make a copy because we will remove once with non-existing topics
  # We keep original requested consumer groups with topics for later backfilling
  cgs_with_topics = consumer_groups_with_topics.dup
  cgs_with_topics.transform_values!(&:dup)

  # We can query only topics that do exist, this is why we are cleaning those that do not
  # exist
  cgs_with_topics.each_value do |requested_topics|
    requested_topics.delete_if { |topic| !existing_topics.include?(topic) }
  end

  groups_lags = Hash.new { |h, k| h[k] = {} }
  groups_offs = Hash.new { |h, k| h[k] = {} }

  cgs_with_topics.each do |cg, topics|
    # Do not add to tpl topics that do not exist
    next if topics.empty?

    tpl = Rdkafka::Consumer::TopicPartitionList.new

    with_consumer('group.id': cg) do |consumer|
      topics.each { |topic| tpl.add_topic(topic, existing_topics[topic]) }

      commit_offsets = consumer.committed(tpl)

      commit_offsets.to_h.each do |topic, partitions|
        groups_offs[cg][topic] = {}

        partitions.each do |partition|
          # -1 when no offset is stored
          groups_offs[cg][topic][partition.partition] = partition.offset || -1
        end
      end

      consumer.lag(commit_offsets).each do |topic, partitions_lags|
        groups_lags[cg][topic] = partitions_lags
      end
    end
  end

  consumer_groups_with_topics.each do |cg, topics|
    groups_lags[cg]

    topics.each do |topic|
      groups_lags[cg][topic] ||= {}

      next unless existing_topics.key?(topic)

      # We backfill because there is a case where our consumer group would consume for
      # example only one partition out of 20, rest needs to get -1
      existing_topics[topic].times do |partition_id|
        groups_lags[cg][topic][partition_id] ||= -1
      end
    end
  end

  merged = Hash.new { |h, k| h[k] = {} }

  groups_lags.each do |cg, topics|
    topics.each do |topic, partitions|
      merged[cg][topic] = {}

      partitions.each do |partition, lag|
        merged[cg][topic][partition] = {
          offset: groups_offs.fetch(cg).fetch(topic).fetch(partition),
          lag: lag
        }
      end
    end
  end

  merged
end

.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>

Allows us to read messages from the topic

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start. If -1 is provided (default) we will start from the latest offset. If time is provided, the appropriate offset will be resolved. If negative beyond -1 is provided, we move backwards more.

  • settings (Hash) (defaults to: {})

    kafka extra settings (optional)

Returns:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/karafka/admin.rb', line 31

def read_topic(name, partition, count, start_offset = -1, settings = {})
  messages = []
  tpl = Rdkafka::Consumer::TopicPartitionList.new
  low_offset, high_offset = nil

  with_consumer(settings) do |consumer|
    # Convert the time offset (if needed)
    start_offset = resolve_offset(consumer, name.to_s, partition, start_offset)

    low_offset, high_offset = consumer.query_watermark_offsets(name, partition)

    # Select offset dynamically if -1 or less and move backwards with the negative
    # offset, allowing to start from N messages back from high-watermark
    start_offset = high_offset - count - start_offset.abs + 1 if start_offset.negative?
    start_offset = low_offset if start_offset.negative?

    # Build the requested range - since first element is on the start offset we need to
    # subtract one from requested count to end up with expected number of elements
    requested_range = (start_offset..start_offset + (count - 1))
    # Establish theoretical available range. Note, that this does not handle cases related to
    # log retention or compaction
    available_range = (low_offset..(high_offset - 1))
    # Select only offset that we can select. This will remove all the potential offsets that
    # are below the low watermark offset
    possible_range = requested_range.select { |offset| available_range.include?(offset) }

    start_offset = possible_range.first
    count = possible_range.count

    tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset)
    consumer.assign(tpl)

    # We should poll as long as we don't have all the messages that we need or as long as
    # we do not read all the messages from the topic
    loop do
      # If we've got as many messages as we've wanted stop
      break if messages.size >= count

      message = consumer.poll(200)

      next unless message

      # If the message we've got is beyond the requested range, stop
      break unless possible_range.include?(message.offset)

      messages << message
    rescue Rdkafka::RdkafkaError => e
      # End of partition
      break if e.code == :partition_eof

      raise e
    end
  end

  # Use topic from routes if we can match it or create a dummy one
  # Dummy one is used in case we cannot match the topic with routes. This can happen
  # when admin API is used to read topics that are not part of the routing
  topic = ::Karafka::Routing::Router.find_or_initialize_by_name(name)

  messages.map! do |message|
    Messages::Builders::Message.call(
      message,
      topic,
      Time.now
    )
  end
end

.read_watermark_offsets(name, partition) ⇒ Array<Integer, Integer>

Fetches the watermark offsets for a given topic partition

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

Returns:

  • (Array<Integer, Integer>)

    low watermark offset and high watermark offset



337
338
339
340
341
# File 'lib/karafka/admin.rb', line 337

def read_watermark_offsets(name, partition)
  with_consumer do |consumer|
    consumer.query_watermark_offsets(name, partition)
  end
end

.rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Object

Note:

This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.

Note:

After migration unless delete_previous is set to false, old group will be removed.

Note:

If new consumer group exists, old offsets will be added to it.

Takes consumer group and its topics and migrates all the offsets to a new named group

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets during rename

  • delete_previous (Boolean) (defaults to: true)

    should we delete previous consumer group after rename. Defaults to true.



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/karafka/admin.rb', line 292

def rename_consumer_group(previous_name, new_name, topics, delete_previous: true)
  remap = Hash.new { |h, k| h[k] = {} }

  old_lags = read_lags_with_offsets({ previous_name => topics })

  return if old_lags.empty?

  read_lags_with_offsets({ previous_name => topics })
    .fetch(previous_name)
    .each do |topic, partitions|
      partitions.each do |partition_id, details|
        offset = details[:offset]

        # No offset on this partition
        next if offset.negative?

        remap[topic][partition_id] = offset
      end
    end

  seek_consumer_group(new_name, remap)

  return unless delete_previous

  delete_consumer_group(previous_name)
end

.seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object

Note:

This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.

Moves the offset on a given consumer group and provided topic to the requested location

Examples:

Move a single topic partition nr 1 offset to 100

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 1 => 100 } })

Move offsets on all partitions of a topic to 100

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => 100 })

Move offset to 5 seconds ago on partition 2

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 2 => 5.seconds.ago } })

Move to the earliest offset on all the partitions of a topic

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => 'earliest' })

Move to the latest (high-watermark) offset on all the partitions of a topic

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => 'latest' })

Move offset of a single partition to earliest

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 1 => 'earliest' } })

Move offset of a single partition to latest

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 1 => 'latest' } })

Parameters:

  • consumer_group_id (String)

    id of the consumer group for which we want to move the existing offset

  • topics_with_partitions_and_offsets (Hash)

    Hash with list of topics and settings to where to move given consumer. It allows us to move particular partitions or whole topics if we want to reset all partitions to for example a point in time.



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/karafka/admin.rb', line 177

def seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets)
  tpl_base = {}

  # Normalize the data so we always have all partitions and topics in the same format
  # That is in a format where we have topics and all partitions with their per partition
  # assigned offsets
  topics_with_partitions_and_offsets.each do |topic, partitions_with_offsets|
    tpl_base[topic] = {}

    if partitions_with_offsets.is_a?(Hash)
      tpl_base[topic] = partitions_with_offsets
    else
      topic_info(topic)[:partition_count].times do |partition|
        tpl_base[topic][partition] = partitions_with_offsets
      end
    end
  end

  tpl_base.each_value do |partitions|
    partitions.transform_values! do |position|
      # Support both symbol and string based references
      casted_position = position.is_a?(Symbol) ? position.to_s : position

      # This remap allows us to transform some special cases in a reference that can be
      # understood by Kafka
      case casted_position
      # Earliest is not always 0. When compacting/deleting it can be much later, that's why
      # we fetch the oldest possible offset
      when 'earliest'
        Time.now - HUNDRED_YEARS
      # Latest will always be the high-watermark offset and we can get it just by getting
      # a future position
      when 'latest'
        Time.now + HUNDRED_YEARS
      # Same as `'latest'`
      when false
        Time.now - HUNDRED_YEARS
      # Regular offset case
      else
        position
      end
    end
  end

  tpl = Rdkafka::Consumer::TopicPartitionList.new
  # In case of time based location, we need to to a pre-resolution, that's why we keep it
  # separately
  time_tpl = Rdkafka::Consumer::TopicPartitionList.new

  # Distribute properly the offset type
  tpl_base.each do |topic, partitions_with_offsets|
    partitions_with_offsets.each do |partition, offset|
      target = offset.is_a?(Time) ? time_tpl : tpl
      # We reverse and uniq to make sure that potentially duplicated references are removed
      # in such a way that the newest stays
      target.to_h[topic] ||= []
      target.to_h[topic] << Rdkafka::Consumer::Partition.new(partition, offset)
      target.to_h[topic].reverse!
      target.to_h[topic].uniq!(&:partition)
      target.to_h[topic].reverse!
    end
  end

  settings = { 'group.id': consumer_group_id }

  with_consumer(settings) do |consumer|
    # If we have any time based stuff to resolve, we need to do it prior to commits
    unless time_tpl.empty?
      real_offsets = consumer.offsets_for_times(time_tpl)

      real_offsets.to_h.each do |name, results|
        results.each do |result|
          raise(Errors::InvalidTimeBasedOffsetError) unless result

          partition = result.partition

          # Negative offset means we're beyond last message and we need to query for the
          # high watermark offset to get the most recent offset and move there
          if result.offset.negative?
            _, offset = consumer.query_watermark_offsets(name, result.partition)
          else
            # If we get an offset, it means there existed a message close to this time
            # location
            offset = result.offset
          end

          # Since now we have proper offsets, we can add this to the final tpl for commit
          tpl.to_h[name] ||= []
          tpl.to_h[name] << Rdkafka::Consumer::Partition.new(partition, offset)
          tpl.to_h[name].reverse!
          tpl.to_h[name].uniq!(&:partition)
          tpl.to_h[name].reverse!
        end
      end
    end

    consumer.commit_offsets(tpl, async: false)
  end
end

.topic_info(topic_name) ⇒ Hash

Note:

This query is much more efficient than doing a full #cluster_info + topic lookup because it does not have to query for all the topics data but just the topic we’re interested in

Returns basic topic metadata

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

Returns:

  • (Hash)

    topic metadata info hash

Raises:

  • (Rdkafka::RdkafkaError)

    unknown_topic_or_part if requested topic is not found



467
468
469
470
471
472
473
474
# File 'lib/karafka/admin.rb', line 467

def topic_info(topic_name)
  with_admin do |admin|
    admin
      .(topic_name)
      .topics
      .find { |topic| topic[:topic_name] == topic_name }
  end
end

.with_adminObject

Creates admin instance and yields it. After usage it closes the admin instance



509
510
511
512
513
514
515
516
517
518
519
520
521
522
# File 'lib/karafka/admin.rb', line 509

def with_admin
  bind_id = SecureRandom.uuid

  admin = config(:producer, {}).admin(native_kafka_auto_start: false)
  bind_oauth(bind_id, admin)

  admin.start
  proxy = ::Karafka::Connection::Proxy.new(admin)
  yield(proxy)
ensure
  admin&.close

  unbind_oauth(bind_id)
end

.with_consumer(settings = {}) ⇒ Object

Note:

We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated

Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage

Parameters:

  • settings (Hash) (defaults to: {})

    extra settings to customize consumer



483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
# File 'lib/karafka/admin.rb', line 483

def with_consumer(settings = {})
  bind_id = SecureRandom.uuid

  consumer = config(:consumer, settings).consumer(native_kafka_auto_start: false)
  bind_oauth(bind_id, consumer)

  consumer.start
  proxy = ::Karafka::Connection::Proxy.new(consumer)
  yield(proxy)
ensure
  # Always unsubscribe consumer just to be sure, that no metadata requests are running
  # when we close the consumer. This in theory should prevent from some race-conditions
  # that originate from librdkafka
  begin
    consumer&.unsubscribe
  # Ignore any errors and continue to close consumer despite them
  rescue Rdkafka::RdkafkaError
    nil
  end

  consumer&.close

  unbind_oauth(bind_id)
end