Class: Karafka::Admin

Inherits:
Object
  • Object
show all
Extended by:
Core::Helpers::Time
Defined in:
lib/karafka/admin.rb,
lib/karafka/admin/acl.rb,
lib/karafka/admin/topics.rb,
lib/karafka/admin/configs.rb,
lib/karafka/admin/configs/config.rb,
lib/karafka/admin/consumer_groups.rb,
lib/karafka/admin/configs/resource.rb

Overview

Note:

It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.

Note:

It always uses the primary defined cluster and does not support multi-cluster work. Cluster on which operations are performed can be changed via admin.kafka config, however there is no multi-cluster runtime support.

Admin actions that we can perform via Karafka on our Kafka cluster

Direct Known Subclasses

Acl, Configs, ConsumerGroups, Topics

Defined Under Namespace

Classes: Acl, Configs, ConsumerGroups, Topics

Class Method Summary collapse

Class Method Details

.cluster_infoRdkafka::Metadata

Returns cluster metadata info.

Returns:

  • (Rdkafka::Metadata)

    cluster metadata info



143
144
145
# File 'lib/karafka/admin.rb', line 143

def cluster_info
  with_admin(&:metadata)
end

.copy_consumer_group(previous_name, new_name, topics) ⇒ Boolean

Takes consumer group and its topics and copies all the offsets to a new named group

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets during rename

Returns:

  • (Boolean)

    true if anything was migrated, otherwise false

See Also:



90
91
92
# File 'lib/karafka/admin.rb', line 90

def copy_consumer_group(previous_name, new_name, topics)
  ConsumerGroups.copy(previous_name, new_name, topics)
end

.create_partitions(name, partitions) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with

See Also:



58
59
60
# File 'lib/karafka/admin.rb', line 58

def create_partitions(name, partitions)
  Topics.create_partitions(name, partitions)
end

.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions we expect

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details

See Also:



45
46
47
# File 'lib/karafka/admin.rb', line 45

def create_topic(name, partitions, replication_factor, topic_config = {})
  Topics.create(name, partitions, replication_factor, topic_config)
end

.delete_consumer_group(consumer_group_id) ⇒ Object

Removes given consumer group (if exists)

Parameters:

  • consumer_group_id (String)

    consumer group name

See Also:



112
113
114
# File 'lib/karafka/admin.rb', line 112

def delete_consumer_group(consumer_group_id)
  ConsumerGroups.delete(consumer_group_id)
end

.delete_topic(name) ⇒ Object

Parameters:

  • name (String)

    topic name

See Also:



51
52
53
# File 'lib/karafka/admin.rb', line 51

def delete_topic(name)
  Topics.delete(name)
end

.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}

Reads lags and offsets for given topics in the context of consumer groups defined in the routing

Parameters:

  • consumer_groups_with_topics (Hash{String => Array<String>}) (defaults to: {})

    hash with consumer groups names with array of topics to query per consumer group inside

  • active_topics_only (Boolean) (defaults to: true)

    if set to false, when we use routing topics, will select also topics that are marked as inactive in routing

Returns:

  • (Hash{String => Hash{Integer => Hash{Integer => Object}}})

    hash where the top level keys are the consumer groups and values are hashes with topics and inside partitions with lags and offsets

See Also:



135
136
137
138
139
140
# File 'lib/karafka/admin.rb', line 135

def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true)
  ConsumerGroups.read_lags_with_offsets(
    consumer_groups_with_topics,
    active_topics_only: active_topics_only
  )
end

.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start

  • settings (Hash) (defaults to: {})

    kafka extra settings

See Also:



36
37
38
# File 'lib/karafka/admin.rb', line 36

def read_topic(name, partition, count, start_offset = -1, settings = {})
  Topics.read(name, partition, count, start_offset, settings)
end

.read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object

Parameters:

  • name_or_hash (String, Symbol, Hash)

    topic name or hash with topics and partitions

  • partition (Integer, nil) (defaults to: nil)

    partition (nil when using hash format)

See Also:



65
66
67
# File 'lib/karafka/admin.rb', line 65

def read_watermark_offsets(name_or_hash, partition = nil)
  Topics.read_watermark_offsets(name_or_hash, partition)
end

.rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean

Takes consumer group and its topics and migrates all the offsets to a new named group

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets during rename

  • delete_previous (Boolean) (defaults to: true)

    should we delete previous consumer group after rename. Defaults to true.

Returns:

  • (Boolean)

    true if rename (and optionally removal) was ok or false if there was nothing really to rename

See Also:



104
105
106
# File 'lib/karafka/admin.rb', line 104

def rename_consumer_group(previous_name, new_name, topics, delete_previous: true)
  ConsumerGroups.rename(previous_name, new_name, topics, delete_previous: delete_previous)
end

.seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object

Parameters:

  • consumer_group_id (String)

    id of the consumer group for which we want to move the existing offset

  • topics_with_partitions_and_offsets (Hash)

    Hash with list of topics and settings

See Also:



79
80
81
# File 'lib/karafka/admin.rb', line 79

def seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets)
  ConsumerGroups.seek(consumer_group_id, topics_with_partitions_and_offsets)
end

.topic_info(topic_name) ⇒ Object

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

See Also:



71
72
73
# File 'lib/karafka/admin.rb', line 71

def topic_info(topic_name)
  Topics.info(topic_name)
end

.trigger_rebalance(consumer_group_id) ⇒ Object

Note:

This API should be used only for development.

Triggers a rebalance for the specified consumer group

Parameters:

  • consumer_group_id (String)

    consumer group id to trigger rebalance for

See Also:



121
122
123
# File 'lib/karafka/admin.rb', line 121

def trigger_rebalance(consumer_group_id)
  ConsumerGroups.trigger_rebalance(consumer_group_id)
end

.with_adminObject

Creates admin instance and yields it. After usage it closes the admin instance



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/karafka/admin.rb', line 180

def with_admin
  bind_id = SecureRandom.uuid

  admin = config(:producer, {}).admin(
    native_kafka_auto_start: false,
    native_kafka_poll_timeout_ms: poll_timeout
  )

  bind_oauth(bind_id, admin)

  admin.start
  proxy = Karafka::Connection::Proxy.new(admin)
  yield(proxy)
ensure
  admin&.close

  unbind_oauth(bind_id)
end

.with_consumer(settings = {}) ⇒ Object

Note:

We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated

Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage

Parameters:

  • settings (Hash) (defaults to: {})

    extra settings to customize consumer



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/karafka/admin.rb', line 154

def with_consumer(settings = {})
  bind_id = SecureRandom.uuid

  consumer = config(:consumer, settings).consumer(native_kafka_auto_start: false)
  bind_oauth(bind_id, consumer)

  consumer.start
  proxy = Karafka::Connection::Proxy.new(consumer)
  yield(proxy)
ensure
  # Always unsubscribe consumer just to be sure, that no metadata requests are running
  # when we close the consumer. This in theory should prevent from some race-conditions
  # that originate from librdkafka
  begin
    consumer&.unsubscribe
  # Ignore any errors and continue to close consumer despite them
  rescue Rdkafka::RdkafkaError
    nil
  end

  consumer&.close

  unbind_oauth(bind_id)
end