Module: Karafka::Admin

Extended by:
Core::Helpers::Time
Defined in:
lib/karafka/admin.rb,
lib/karafka/admin/acl.rb,
lib/karafka/admin/configs.rb,
lib/karafka/admin/configs/config.rb,
lib/karafka/admin/configs/resource.rb

Overview

Note:

It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.

Note:

It always uses the primary defined cluster and does not support multi-cluster work. Cluster on which operations are performed can be changed via admin.kafka config, however there is no multi-cluster runtime support.

Admin actions that we can perform via Karafka on our Kafka cluster

Defined Under Namespace

Modules: Configs Classes: Acl

Class Method Summary collapse

Class Method Details

.cluster_infoRdkafka::Metadata

Returns cluster metadata info.

Returns:

  • (Rdkafka::Metadata)

    cluster metadata info



494
495
496
# File 'lib/karafka/admin.rb', line 494

def cluster_info
  with_admin(&:metadata)
end

.copy_consumer_group(previous_name, new_name, topics) ⇒ Boolean

Note:

This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.

Note:

If new consumer group exists, old offsets will be added to it.

Takes consumer group and its topics and copies all the offsets to a new named group

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets during rename

Returns:

  • (Boolean)

    true if anything was migrated, otherwise false



305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/karafka/admin.rb', line 305

def copy_consumer_group(previous_name, new_name, topics)
  remap = Hash.new { |h, k| h[k] = {} }

  old_lags = read_lags_with_offsets({ previous_name => topics })

  return false if old_lags.empty?
  return false if old_lags.values.all? { |topic_data| topic_data.values.all?(&:empty?) }

  read_lags_with_offsets({ previous_name => topics })
    .fetch(previous_name)
    .each do |topic, partitions|
      partitions.each do |partition_id, details|
        offset = details[:offset]

        # No offset on this partition
        next if offset.negative?

        remap[topic][partition_id] = offset
      end
    end

  seek_consumer_group(new_name, remap)

  true
end

.create_partitions(name, partitions) ⇒ Object

Creates more partitions for a given topic

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with



152
153
154
155
156
157
158
159
160
161
# File 'lib/karafka/admin.rb', line 152

def create_partitions(name, partitions)
  with_admin do |admin|
    handler = admin.create_partitions(name, partitions)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: max_wait_time_seconds) },
      -> { topic_info(name).fetch(:partition_count) >= partitions }
    )
  end
end

.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object

Creates Kafka topic with given settings

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions we expect

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details as described here: kafka.apache.org/documentation/#topicconfigs



123
124
125
126
127
128
129
130
131
132
# File 'lib/karafka/admin.rb', line 123

def create_topic(name, partitions, replication_factor, topic_config = {})
  with_admin do |admin|
    handler = admin.create_topic(name, partitions, replication_factor, topic_config)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: max_wait_time_seconds) },
      -> { topics_names.include?(name) }
    )
  end
end

.delete_consumer_group(consumer_group_id) ⇒ Object

Note:

This method should not be used on a running consumer group as it will not yield any results.

Removes given consumer group (if exists)

Parameters:

  • consumer_group_id (String)

    consumer group name



365
366
367
368
369
370
# File 'lib/karafka/admin.rb', line 365

def delete_consumer_group(consumer_group_id)
  with_admin do |admin|
    handler = admin.delete_group(consumer_group_id)
    handler.wait(max_wait_timeout: max_wait_time_seconds)
  end
end

.delete_topic(name) ⇒ Object

Deleted a given topic

Parameters:

  • name (String)

    topic name



137
138
139
140
141
142
143
144
145
146
# File 'lib/karafka/admin.rb', line 137

def delete_topic(name)
  with_admin do |admin|
    handler = admin.delete_topic(name)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: max_wait_time_seconds) },
      -> { !topics_names.include?(name) }
    )
  end
end

.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash<String, Hash<Integer, <Hash<Integer>>>>

Note:

For topics that do not exist, topic details will be set to an empty hash

Note:

For topics that exist but were never consumed by a given CG we set -1 as lag and the offset on each of the partitions that were not consumed.

Note:

This lag reporting is for committed lags and is “Kafka-centric”, meaning that this represents lags from Kafka perspective and not the consumer. They may differ.

Reads lags and offsets for given topics in the context of consumer groups defined in the routing

Parameters:

  • consumer_groups_with_topics (Hash<String, Array<String>>) (defaults to: {})

    hash with consumer groups names with array of topics to query per consumer group inside

  • active_topics_only (Boolean) (defaults to: true)

    if set to false, when we use routing topics, will select also topics that are marked as inactive in routing

Returns:

  • (Hash<String, Hash<Integer, <Hash<Integer>>>>)

    hash where the top level keys are the consumer groups and values are hashes with topics and inside partitions with lags and offsets



400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
# File 'lib/karafka/admin.rb', line 400

def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true)
  # We first fetch all the topics with partitions count that exist in the cluster so we
  # do not query for topics that do not exist and so we can get partitions count for all
  # the topics we may need. The non-existent and not consumed will be filled at the end
  existing_topics = cluster_info.topics.map do |topic|
    [topic[:topic_name], topic[:partition_count]]
  end.to_h.freeze

  # If no expected CGs, we use all from routing that have active topics
  if consumer_groups_with_topics.empty?
    consumer_groups_with_topics = Karafka::App.routes.map do |cg|
      cg_topics = cg.topics.select do |cg_topic|
        active_topics_only ? cg_topic.active? : true
      end

      [cg.id, cg_topics.map(&:name)]
    end.to_h
  end

  # We make a copy because we will remove once with non-existing topics
  # We keep original requested consumer groups with topics for later backfilling
  cgs_with_topics = consumer_groups_with_topics.dup
  cgs_with_topics.transform_values!(&:dup)

  # We can query only topics that do exist, this is why we are cleaning those that do not
  # exist
  cgs_with_topics.each_value do |requested_topics|
    requested_topics.delete_if { |topic| !existing_topics.include?(topic) }
  end

  groups_lags = Hash.new { |h, k| h[k] = {} }
  groups_offs = Hash.new { |h, k| h[k] = {} }

  cgs_with_topics.each do |cg, topics|
    # Do not add to tpl topics that do not exist
    next if topics.empty?

    tpl = Rdkafka::Consumer::TopicPartitionList.new

    with_consumer('group.id': cg) do |consumer|
      topics.each { |topic| tpl.add_topic(topic, existing_topics[topic]) }

      commit_offsets = consumer.committed(tpl)

      commit_offsets.to_h.each do |topic, partitions|
        groups_offs[cg][topic] = {}

        partitions.each do |partition|
          # -1 when no offset is stored
          groups_offs[cg][topic][partition.partition] = partition.offset || -1
        end
      end

      consumer.lag(commit_offsets).each do |topic, partitions_lags|
        groups_lags[cg][topic] = partitions_lags
      end
    end
  end

  consumer_groups_with_topics.each do |cg, topics|
    groups_lags[cg]

    topics.each do |topic|
      groups_lags[cg][topic] ||= {}

      next unless existing_topics.key?(topic)

      # We backfill because there is a case where our consumer group would consume for
      # example only one partition out of 20, rest needs to get -1
      existing_topics[topic].times do |partition_id|
        groups_lags[cg][topic][partition_id] ||= -1
      end
    end
  end

  merged = Hash.new { |h, k| h[k] = {} }

  groups_lags.each do |cg, topics|
    topics.each do |topic, partitions|
      merged[cg][topic] = {}

      partitions.each do |partition, lag|
        merged[cg][topic][partition] = {
          offset: groups_offs.fetch(cg).fetch(topic).fetch(partition),
          lag: lag
        }
      end
    end
  end

  merged
end

.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>

Allows us to read messages from the topic

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start. If -1 is provided (default) we will start from the latest offset. If time is provided, the appropriate offset will be resolved. If negative beyond -1 is provided, we move backwards more.

  • settings (Hash) (defaults to: {})

    kafka extra settings (optional)

Returns:



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/karafka/admin.rb', line 48

def read_topic(name, partition, count, start_offset = -1, settings = {})
  messages = []
  tpl = Rdkafka::Consumer::TopicPartitionList.new
  low_offset, high_offset = nil

  with_consumer(settings) do |consumer|
    # Convert the time offset (if needed)
    start_offset = resolve_offset(consumer, name.to_s, partition, start_offset)

    low_offset, high_offset = consumer.query_watermark_offsets(name, partition)

    # Select offset dynamically if -1 or less and move backwards with the negative
    # offset, allowing to start from N messages back from high-watermark
    start_offset = high_offset - count - start_offset.abs + 1 if start_offset.negative?
    start_offset = low_offset if start_offset.negative?

    # Build the requested range - since first element is on the start offset we need to
    # subtract one from requested count to end up with expected number of elements
    requested_range = (start_offset..start_offset + (count - 1))
    # Establish theoretical available range. Note, that this does not handle cases related to
    # log retention or compaction
    available_range = (low_offset..(high_offset - 1))
    # Select only offset that we can select. This will remove all the potential offsets that
    # are below the low watermark offset
    possible_range = requested_range.select { |offset| available_range.include?(offset) }

    start_offset = possible_range.first
    count = possible_range.size

    tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset)
    consumer.assign(tpl)

    # We should poll as long as we don't have all the messages that we need or as long as
    # we do not read all the messages from the topic
    loop do
      # If we've got as many messages as we've wanted stop
      break if messages.size >= count

      message = consumer.poll(200)

      next unless message

      # If the message we've got is beyond the requested range, stop
      break unless possible_range.include?(message.offset)

      messages << message
    rescue Rdkafka::RdkafkaError => e
      # End of partition
      break if e.code == :partition_eof

      raise e
    end
  end

  # Use topic from routes if we can match it or create a dummy one
  # Dummy one is used in case we cannot match the topic with routes. This can happen
  # when admin API is used to read topics that are not part of the routing
  topic = ::Karafka::Routing::Router.find_or_initialize_by_name(name)

  messages.map! do |message|
    Messages::Builders::Message.call(
      message,
      topic,
      Time.now
    )
  end
end

.read_watermark_offsets(name, partition) ⇒ Array<Integer, Integer>

Fetches the watermark offsets for a given topic partition

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

Returns:

  • (Array<Integer, Integer>)

    low watermark offset and high watermark offset



377
378
379
380
381
# File 'lib/karafka/admin.rb', line 377

def read_watermark_offsets(name, partition)
  with_consumer do |consumer|
    consumer.query_watermark_offsets(name, partition)
  end
end

.rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean

Note:

This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.

Note:

After migration unless delete_previous is set to false, old group will be removed.

Note:

If new consumer group exists, old offsets will be added to it.

Takes consumer group and its topics and migrates all the offsets to a new named group

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets during rename

  • delete_previous (Boolean) (defaults to: true)

    should we delete previous consumer group after rename. Defaults to true.

Returns:

  • (Boolean)

    true if rename (and optionally removal) was ok or false if there was nothing really to rename



348
349
350
351
352
353
354
355
356
357
# File 'lib/karafka/admin.rb', line 348

def rename_consumer_group(previous_name, new_name, topics, delete_previous: true)
  copy_result = copy_consumer_group(previous_name, new_name, topics)

  return false unless copy_result
  return copy_result unless delete_previous

  delete_consumer_group(previous_name)

  true
end

.seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object

Note:

This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.

Moves the offset on a given consumer group and provided topic to the requested location

Examples:

Move a single topic partition nr 1 offset to 100

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 1 => 100 } })

Move offsets on all partitions of a topic to 100

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => 100 })

Move offset to 5 seconds ago on partition 2

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 2 => 5.seconds.ago } })

Move to the earliest offset on all the partitions of a topic

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => 'earliest' })

Move to the latest (high-watermark) offset on all the partitions of a topic

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => 'latest' })

Move offset of a single partition to earliest

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 1 => 'earliest' } })

Move offset of a single partition to latest

Karafka::Admin.seek_consumer_group('group-id', { 'topic' => { 1 => 'latest' } })

Parameters:

  • consumer_group_id (String)

    id of the consumer group for which we want to move the existing offset

  • topics_with_partitions_and_offsets (Hash)

    Hash with list of topics and settings to where to move given consumer. It allows us to move particular partitions or whole topics if we want to reset all partitions to for example a point in time.



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/karafka/admin.rb', line 194

def seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets)
  tpl_base = {}

  # Normalize the data so we always have all partitions and topics in the same format
  # That is in a format where we have topics and all partitions with their per partition
  # assigned offsets
  topics_with_partitions_and_offsets.each do |topic, partitions_with_offsets|
    tpl_base[topic] = {}

    if partitions_with_offsets.is_a?(Hash)
      tpl_base[topic] = partitions_with_offsets
    else
      topic_info(topic)[:partition_count].times do |partition|
        tpl_base[topic][partition] = partitions_with_offsets
      end
    end
  end

  tpl_base.each_value do |partitions|
    partitions.transform_values! do |position|
      # Support both symbol and string based references
      casted_position = position.is_a?(Symbol) ? position.to_s : position

      # This remap allows us to transform some special cases in a reference that can be
      # understood by Kafka
      case casted_position
      # Earliest is not always 0. When compacting/deleting it can be much later, that's why
      # we fetch the oldest possible offset
      when 'earliest'
        LONG_TIME_AGO
      # Latest will always be the high-watermark offset and we can get it just by getting
      # a future position
      when 'latest'
        Time.now + DAY_IN_SECONDS
      # Same as `'earliest'`
      when false
        LONG_TIME_AGO
      # Regular offset case
      else
        position
      end
    end
  end

  tpl = Rdkafka::Consumer::TopicPartitionList.new
  # In case of time based location, we need to to a pre-resolution, that's why we keep it
  # separately
  time_tpl = Rdkafka::Consumer::TopicPartitionList.new

  # Distribute properly the offset type
  tpl_base.each do |topic, partitions_with_offsets|
    partitions_with_offsets.each do |partition, offset|
      target = offset.is_a?(Time) ? time_tpl : tpl
      # We reverse and uniq to make sure that potentially duplicated references are removed
      # in such a way that the newest stays
      target.to_h[topic] ||= []
      target.to_h[topic] << Rdkafka::Consumer::Partition.new(partition, offset)
      target.to_h[topic].reverse!
      target.to_h[topic].uniq!(&:partition)
      target.to_h[topic].reverse!
    end
  end

  settings = { 'group.id': consumer_group_id }

  with_consumer(settings) do |consumer|
    # If we have any time based stuff to resolve, we need to do it prior to commits
    unless time_tpl.empty?
      real_offsets = consumer.offsets_for_times(time_tpl)

      real_offsets.to_h.each do |name, results|
        results.each do |result|
          raise(Errors::InvalidTimeBasedOffsetError) unless result

          partition = result.partition

          # Negative offset means we're beyond last message and we need to query for the
          # high watermark offset to get the most recent offset and move there
          if result.offset.negative?
            _, offset = consumer.query_watermark_offsets(name, result.partition)
          else
            # If we get an offset, it means there existed a message close to this time
            # location
            offset = result.offset
          end

          # Since now we have proper offsets, we can add this to the final tpl for commit
          tpl.to_h[name] ||= []
          tpl.to_h[name] << Rdkafka::Consumer::Partition.new(partition, offset)
          tpl.to_h[name].reverse!
          tpl.to_h[name].uniq!(&:partition)
          tpl.to_h[name].reverse!
        end
      end
    end

    consumer.commit_offsets(tpl, async: false)
  end
end

.topic_info(topic_name) ⇒ Hash

Note:

This query is much more efficient than doing a full #cluster_info + topic lookup because it does not have to query for all the topics data but just the topic we’re interested in

Returns basic topic metadata

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

Returns:

  • (Hash)

    topic metadata info hash

Raises:

  • (Rdkafka::RdkafkaError)

    unknown_topic_or_part if requested topic is not found



507
508
509
510
511
512
513
514
# File 'lib/karafka/admin.rb', line 507

def topic_info(topic_name)
  with_admin do |admin|
    admin
      .(topic_name)
      .topics
      .find { |topic| topic[:topic_name] == topic_name }
  end
end

.with_adminObject

Creates admin instance and yields it. After usage it closes the admin instance



549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
# File 'lib/karafka/admin.rb', line 549

def with_admin
  bind_id = SecureRandom.uuid

  admin = config(:producer, {}).admin(
    native_kafka_auto_start: false,
    native_kafka_poll_timeout_ms: poll_timeout
  )

  bind_oauth(bind_id, admin)

  admin.start
  proxy = ::Karafka::Connection::Proxy.new(admin)
  yield(proxy)
ensure
  admin&.close

  unbind_oauth(bind_id)
end

.with_consumer(settings = {}) ⇒ Object

Note:

We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated

Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage

Parameters:

  • settings (Hash) (defaults to: {})

    extra settings to customize consumer



523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
# File 'lib/karafka/admin.rb', line 523

def with_consumer(settings = {})
  bind_id = SecureRandom.uuid

  consumer = config(:consumer, settings).consumer(native_kafka_auto_start: false)
  bind_oauth(bind_id, consumer)

  consumer.start
  proxy = ::Karafka::Connection::Proxy.new(consumer)
  yield(proxy)
ensure
  # Always unsubscribe consumer just to be sure, that no metadata requests are running
  # when we close the consumer. This in theory should prevent from some race-conditions
  # that originate from librdkafka
  begin
    consumer&.unsubscribe
  # Ignore any errors and continue to close consumer despite them
  rescue Rdkafka::RdkafkaError
    nil
  end

  consumer&.close

  unbind_oauth(bind_id)
end