Class: Rdkafka::Metadata

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/metadata.rb

Defined Under Namespace

Classes: BrokerMetadata, CustomFFIStruct, Metadata, PartitionMetadata, TopicMetadata

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(native_client, topic_name = nil, timeout_ms = 2_000) ⇒ Metadata

Returns a new instance of Metadata.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rdkafka/metadata.rb', line 15

def initialize(native_client, topic_name = nil, timeout_ms = 2_000)
  attempt ||= 0
  attempt += 1

  native_topic = if topic_name
    Rdkafka::Bindings.rd_kafka_topic_new(native_client, topic_name, nil)
  end

  ptr = FFI::MemoryPointer.new(:pointer)

  # If topic_flag is 1, we request info about *all* topics in the cluster.  If topic_flag is 0,
  # we only request info about locally known topics (or a single topic if one is passed in).
  topic_flag = topic_name.nil? ? 1 : 0

  # Retrieve the Metadata
  result = Rdkafka::Bindings.(native_client, topic_flag, native_topic, ptr, timeout_ms)

  # Error Handling
  raise Rdkafka::RdkafkaError.new(result) unless result.zero?

  (ptr.read_pointer)
rescue ::Rdkafka::RdkafkaError => e
  raise unless RETRIED_ERRORS.include?(e.code)
  raise if attempt > 10

  backoff_factor = 2**attempt
  timeout = backoff_factor * 0.1

  sleep(timeout)

  retry
ensure
  Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic) if topic_name
  Rdkafka::Bindings.(ptr.read_pointer)
end

Instance Attribute Details

#brokersObject (readonly)

Returns the value of attribute brokers.



5
6
7
# File 'lib/rdkafka/metadata.rb', line 5

def brokers
  @brokers
end

#topicsObject (readonly)

Returns the value of attribute topics.



5
6
7
# File 'lib/rdkafka/metadata.rb', line 5

def topics
  @topics
end