Class: Karafka::Admin
- Inherits:
-
Object
- Object
- Karafka::Admin
- Extended by:
- Core::Helpers::Time
- Defined in:
- lib/karafka/admin.rb,
lib/karafka/admin/acl.rb,
lib/karafka/admin/topics.rb,
lib/karafka/admin/configs.rb,
lib/karafka/admin/configs/config.rb,
lib/karafka/admin/consumer_groups.rb,
lib/karafka/admin/configs/resource.rb
Overview
It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.
It always uses the primary defined cluster and does not support multi-cluster work.
Cluster on which operations are performed can be changed via admin.kafka config, however
there is no multi-cluster runtime support.
Admin actions that we can perform via Karafka on our Kafka cluster
Direct Known Subclasses
Defined Under Namespace
Classes: Acl, Configs, ConsumerGroups, Topics
Class Method Summary collapse
-
.cluster_info ⇒ Rdkafka::Metadata
Cluster metadata info.
-
.copy_consumer_group(previous_name, new_name, topics) ⇒ Boolean
Takes consumer group and its topics and copies all the offsets to a new named group.
-
.create_partitions(name, partitions) ⇒ Object
-
.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
-
.delete_consumer_group(consumer_group_id) ⇒ Object
Removes given consumer group (if exists).
-
.delete_topic(name) ⇒ Object
-
.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}
Reads lags and offsets for given topics in the context of consumer groups defined in the routing.
-
.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
-
.read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
-
.rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean
Takes consumer group and its topics and migrates all the offsets to a new named group.
-
.seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object
-
.topic_info(topic_name) ⇒ Object
-
.trigger_rebalance(consumer_group_id) ⇒ Object
Triggers a rebalance for the specified consumer group.
-
.with_admin ⇒ Object
Creates admin instance and yields it.
-
.with_consumer(settings = {}) ⇒ Object
Creates consumer instance and yields it.
Class Method Details
.cluster_info ⇒ Rdkafka::Metadata
Returns cluster metadata info.
143 144 145 |
# File 'lib/karafka/admin.rb', line 143 def cluster_info with_admin(&:metadata) end |
.copy_consumer_group(previous_name, new_name, topics) ⇒ Boolean
Takes consumer group and its topics and copies all the offsets to a new named group
90 91 92 |
# File 'lib/karafka/admin.rb', line 90 def copy_consumer_group(previous_name, new_name, topics) ConsumerGroups.copy(previous_name, new_name, topics) end |
.create_partitions(name, partitions) ⇒ Object
58 59 60 |
# File 'lib/karafka/admin.rb', line 58 def create_partitions(name, partitions) Topics.create_partitions(name, partitions) end |
.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
45 46 47 |
# File 'lib/karafka/admin.rb', line 45 def create_topic(name, partitions, replication_factor, topic_config = {}) Topics.create(name, partitions, replication_factor, topic_config) end |
.delete_consumer_group(consumer_group_id) ⇒ Object
Removes given consumer group (if exists)
112 113 114 |
# File 'lib/karafka/admin.rb', line 112 def delete_consumer_group(consumer_group_id) ConsumerGroups.delete(consumer_group_id) end |
.delete_topic(name) ⇒ Object
51 52 53 |
# File 'lib/karafka/admin.rb', line 51 def delete_topic(name) Topics.delete(name) end |
.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}
Reads lags and offsets for given topics in the context of consumer groups defined in the routing
135 136 137 138 139 140 |
# File 'lib/karafka/admin.rb', line 135 def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ConsumerGroups.read_lags_with_offsets( consumer_groups_with_topics, active_topics_only: active_topics_only ) end |
.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
36 37 38 |
# File 'lib/karafka/admin.rb', line 36 def read_topic(name, partition, count, start_offset = -1, settings = {}) Topics.read(name, partition, count, start_offset, settings) end |
.read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
65 66 67 |
# File 'lib/karafka/admin.rb', line 65 def read_watermark_offsets(name_or_hash, partition = nil) Topics.read_watermark_offsets(name_or_hash, partition) end |
.rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean
Takes consumer group and its topics and migrates all the offsets to a new named group
104 105 106 |
# File 'lib/karafka/admin.rb', line 104 def rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ConsumerGroups.rename(previous_name, new_name, topics, delete_previous: delete_previous) end |
.seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object
79 80 81 |
# File 'lib/karafka/admin.rb', line 79 def seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ConsumerGroups.seek(consumer_group_id, topics_with_partitions_and_offsets) end |
.topic_info(topic_name) ⇒ Object
71 72 73 |
# File 'lib/karafka/admin.rb', line 71 def topic_info(topic_name) Topics.info(topic_name) end |
.trigger_rebalance(consumer_group_id) ⇒ Object
This API should be used only for development.
Triggers a rebalance for the specified consumer group
121 122 123 |
# File 'lib/karafka/admin.rb', line 121 def trigger_rebalance(consumer_group_id) ConsumerGroups.trigger_rebalance(consumer_group_id) end |
.with_admin ⇒ Object
Creates admin instance and yields it. After usage it closes the admin instance
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/karafka/admin.rb', line 180 def with_admin bind_id = SecureRandom.uuid admin = config(:producer, {}).admin( native_kafka_auto_start: false, native_kafka_poll_timeout_ms: poll_timeout ) bind_oauth(bind_id, admin) admin.start proxy = Karafka::Connection::Proxy.new(admin) yield(proxy) ensure admin&.close unbind_oauth(bind_id) end |
.with_consumer(settings = {}) ⇒ Object
We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated
Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/karafka/admin.rb', line 154 def with_consumer(settings = {}) bind_id = SecureRandom.uuid consumer = config(:consumer, settings).consumer(native_kafka_auto_start: false) bind_oauth(bind_id, consumer) consumer.start proxy = Karafka::Connection::Proxy.new(consumer) yield(proxy) ensure # Always unsubscribe consumer just to be sure, that no metadata requests are running # when we close the consumer. This in theory should prevent from some race-conditions # that originate from librdkafka begin consumer&.unsubscribe # Ignore any errors and continue to close consumer despite them rescue Rdkafka::RdkafkaError nil end consumer&.close unbind_oauth(bind_id) end |