Class: Karafka::Admin::ConsumerGroups
- Inherits:
-
Karafka::Admin
- Object
- Karafka::Admin
- Karafka::Admin::ConsumerGroups
- Defined in:
- lib/karafka/admin/consumer_groups.rb
Overview
Consumer group administration operations Provides methods to manage Kafka consumer groups including offset management, migration, and introspection
Class Method Summary collapse
-
.copy(previous_name, new_name, topics) ⇒ Boolean
Takes consumer group and its topics and copies all the offsets to a new named group.
-
.delete(consumer_group_id) ⇒ void
Removes given consumer group (if exists).
-
.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}
Reads lags and offsets for given topics in the context of consumer groups defined in the routing.
-
.rename(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean
Takes consumer group and its topics and migrates all the offsets to a new named group.
-
.seek(consumer_group_id, topics_with_partitions_and_offsets) ⇒ void
Moves the offset on a given consumer group and provided topic to the requested location.
-
.trigger_rebalance(consumer_group_id) ⇒ void
Triggers a rebalance for the specified consumer group by briefly joining and leaving.
Methods inherited from Karafka::Admin
cluster_info, copy_consumer_group, create_partitions, create_topic, delete_consumer_group, delete_topic, read_topic, read_watermark_offsets, rename_consumer_group, seek_consumer_group, topic_info, with_admin, with_consumer
Class Method Details
.copy(previous_name, new_name, topics) ⇒ Boolean
This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.
If new consumer group exists, old offsets will be added to it.
Takes consumer group and its topics and copies all the offsets to a new named group
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/karafka/admin/consumer_groups.rb', line 165 def copy(previous_name, new_name, topics) remap = Hash.new { |h, k| h[k] = {} } old_lags = read_lags_with_offsets({ previous_name => topics }) return false if old_lags.empty? return false if old_lags.values.all? { |topic_data| topic_data.values.all?(&:empty?) } read_lags_with_offsets({ previous_name => topics }) .fetch(previous_name) .each do |topic, partitions| partitions.each do |partition_id, details| offset = details[:offset] # No offset on this partition next if offset.negative? remap[topic][partition_id] = offset end end seek(new_name, remap) true end |
.delete(consumer_group_id) ⇒ void
This method should not be used on a running consumer group as it will not yield any results.
This method returns an undefined value.
Removes given consumer group (if exists)
228 229 230 231 232 233 |
# File 'lib/karafka/admin/consumer_groups.rb', line 228 def delete(consumer_group_id) with_admin do |admin| handler = admin.delete_group(consumer_group_id) handler.wait(max_wait_timeout: max_wait_time_seconds) end end |
.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}
For topics that do not exist, topic details will be set to an empty hash
For topics that exist but were never consumed by a given CG we set -1 as lag and
the offset on each of the partitions that were not consumed.
This lag reporting is for committed lags and is “Kafka-centric”, meaning that this represents lags from Kafka perspective and not the consumer. They may differ.
Reads lags and offsets for given topics in the context of consumer groups defined in the routing
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 |
# File 'lib/karafka/admin/consumer_groups.rb', line 324 def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) # We first fetch all the topics with partitions count that exist in the cluster so we # do not query for topics that do not exist and so we can get partitions count for all # the topics we may need. The non-existent and not consumed will be filled at the end existing_topics = cluster_info.topics.to_h do |topic| [topic[:topic_name], topic[:partition_count]] end.freeze # If no expected CGs, we use all from routing that have active topics if consumer_groups_with_topics.empty? consumer_groups_with_topics = Karafka::App.routes.to_h do |cg| cg_topics = cg.topics.select do |cg_topic| active_topics_only ? cg_topic.active? : true end [cg.id, cg_topics.map(&:name)] end end # We make a copy because we will remove once with non-existing topics # We keep original requested consumer groups with topics for later backfilling cgs_with_topics = consumer_groups_with_topics.dup cgs_with_topics.transform_values!(&:dup) # We can query only topics that do exist, this is why we are cleaning those that do not # exist cgs_with_topics.each_value do |requested_topics| requested_topics.delete_if { |topic| !existing_topics.include?(topic) } end groups_lags = Hash.new { |h, k| h[k] = {} } groups_offs = Hash.new { |h, k| h[k] = {} } cgs_with_topics.each do |cg, topics| # Do not add to tpl topics that do not exist next if topics.empty? tpl = Rdkafka::Consumer::TopicPartitionList.new with_consumer('group.id': cg) do |consumer| topics.each { |topic| tpl.add_topic(topic, existing_topics[topic]) } commit_offsets = consumer.committed(tpl) commit_offsets.to_h.each do |topic, partitions| groups_offs[cg][topic] = {} partitions.each do |partition| # -1 when no offset is stored groups_offs[cg][topic][partition.partition] = partition.offset || -1 end end consumer.lag(commit_offsets).each do |topic, partitions_lags| groups_lags[cg][topic] = partitions_lags end end end consumer_groups_with_topics.each do |cg, topics| groups_lags[cg] topics.each do |topic| groups_lags[cg][topic] ||= {} next unless existing_topics.key?(topic) # We backfill because there is a case where our consumer group would consume for # example only one partition out of 20, rest needs to get -1 existing_topics[topic].times do |partition_id| groups_lags[cg][topic][partition_id] ||= -1 end end end merged = Hash.new { |h, k| h[k] = {} } groups_lags.each do |cg, topics| topics.each do |topic, partitions| merged[cg][topic] = {} partitions.each do |partition, lag| merged[cg][topic][partition] = { offset: groups_offs.fetch(cg).fetch(topic).fetch(partition), lag: lag } end end end merged end |
.rename(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean
This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.
After migration unless delete_previous is set to false, old group will be
removed.
If new consumer group exists, old offsets will be added to it.
Takes consumer group and its topics and migrates all the offsets to a new named group
209 210 211 212 213 214 215 216 217 218 |
# File 'lib/karafka/admin/consumer_groups.rb', line 209 def rename(previous_name, new_name, topics, delete_previous: true) copy_result = copy(previous_name, new_name, topics) return false unless copy_result return copy_result unless delete_previous delete(previous_name) true end |
.seek(consumer_group_id, topics_with_partitions_and_offsets) ⇒ void
This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.
This method returns an undefined value.
Moves the offset on a given consumer group and provided topic to the requested location
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/karafka/admin/consumer_groups.rb', line 54 def seek(consumer_group_id, topics_with_partitions_and_offsets) tpl_base = {} # Normalize the data so we always have all partitions and topics in the same format # That is in a format where we have topics and all partitions with their per partition # assigned offsets topics_with_partitions_and_offsets.each do |topic, partitions_with_offsets| tpl_base[topic] = {} if partitions_with_offsets.is_a?(Hash) tpl_base[topic] = partitions_with_offsets else topic_info = Topics.info(topic) topic_info[:partition_count].times do |partition| tpl_base[topic][partition] = partitions_with_offsets end end end tpl_base.each_value do |partitions| partitions.transform_values! do |position| # Support both symbol and string based references casted_position = position.is_a?(Symbol) ? position.to_s : position # This remap allows us to transform some special cases in a reference that can be # understood by Kafka case casted_position # Earliest is not always 0. When compacting/deleting it can be much later, that's why # we fetch the oldest possible offset # false is treated the same as 'earliest' when 'earliest', false LONG_TIME_AGO # Latest will always be the high-watermark offset and we can get it just by getting # a future position when 'latest' Time.now + DAY_IN_SECONDS # Regular offset case else position end end end tpl = Rdkafka::Consumer::TopicPartitionList.new # In case of time based location, we need to to a pre-resolution, that's why we keep it # separately time_tpl = Rdkafka::Consumer::TopicPartitionList.new # Distribute properly the offset type tpl_base.each do |topic, partitions_with_offsets| partitions_with_offsets.each do |partition, offset| target = offset.is_a?(Time) ? time_tpl : tpl # We reverse and uniq to make sure that potentially duplicated references are removed # in such a way that the newest stays target.to_h[topic] ||= [] target.to_h[topic] << Rdkafka::Consumer::Partition.new(partition, offset) target.to_h[topic].reverse! target.to_h[topic].uniq!(&:partition) target.to_h[topic].reverse! end end settings = { 'group.id': consumer_group_id } with_consumer(settings) do |consumer| # If we have any time based stuff to resolve, we need to do it prior to commits unless time_tpl.empty? real_offsets = consumer.offsets_for_times(time_tpl) real_offsets.to_h.each do |name, results| results.each do |result| raise(Errors::InvalidTimeBasedOffsetError) unless result partition = result.partition # Negative offset means we're beyond last message and we need to query for the # high watermark offset to get the most recent offset and move there if result.offset.negative? _, offset = consumer.query_watermark_offsets(name, result.partition) else # If we get an offset, it means there existed a message close to this time # location offset = result.offset end # Since now we have proper offsets, we can add this to the final tpl for commit tpl.to_h[name] ||= [] tpl.to_h[name] << Rdkafka::Consumer::Partition.new(partition, offset) tpl.to_h[name].reverse! tpl.to_h[name].uniq!(&:partition) tpl.to_h[name].reverse! end end end consumer.commit_offsets(tpl, async: false) end end |
.trigger_rebalance(consumer_group_id) ⇒ void
This method creates a temporary “fake” consumer that joins the consumer group, triggering a rebalance when it joins and another when it leaves. This should only be used for operational/testing purposes as it causes two rebalances.
The consumer group does not need to be running for this to work, but if it is, it will experience rebalances.
The behavior follows the configured rebalance protocol. For cooperative sticky rebalancing or KIP-848 based protocols, there may be no immediate reaction to the rebalance trigger as these protocols allow incremental partition reassignments without stopping all consumers.
Topics are always detected from the routing configuration. The consumer settings (kafka config) are taken from the first topic in the consumer group to ensure consistency with the actual consumer configuration.
This method returns an undefined value.
Triggers a rebalance for the specified consumer group by briefly joining and leaving
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/karafka/admin/consumer_groups.rb', line 262 def trigger_rebalance(consumer_group_id) consumer_group = Karafka::App.routes.find { |cg| cg.id == consumer_group_id } unless consumer_group raise( Errors::InvalidConfigurationError, "Consumer group '#{consumer_group_id}' not found in routing" ) end topics = consumer_group.topics.map(&:name) if topics.empty? raise( Errors::InvalidConfigurationError, "Consumer group '#{consumer_group_id}' has no topics" ) end # Get the first topic to extract kafka settings first_topic = consumer_group.topics.first # Build consumer settings using the consumer group's kafka config from first topic # This ensures we use the same settings as the actual consumers # Following the same pattern as in Karafka::Connection::Client#build_kafka consumer_settings = Setup::AttributesMap.consumer(first_topic.kafka.dup) consumer_settings[:'group.id'] = consumer_group.id consumer_settings[:'enable.auto.offset.store'] = false consumer_settings[:'auto.offset.reset'] ||= first_topic.initial_offset with_consumer(consumer_settings) do |consumer| # Subscribe to the topics - this triggers the first rebalance consumer.subscribe(*topics) # Wait briefly (100ms) to allow the rebalance to initiate # The actual rebalance happens asynchronously, so we just need to give it a moment sleep(0.1) # Unsubscribe - this will trigger the second rebalance when the consumer closes # The ensure block in with_consumer will handle the unsubscribe and close end end |