Module: WaterDrop::Patches::Rdkafka::Producer

Includes:
Karafka::Core::Helpers::Time
Defined in:
lib/waterdrop/patches/rdkafka/producer.rb

Overview

Rdkafka::Producer patches

Instance Method Summary collapse

Instance Method Details

#close(timeout_ms = 5_000) ⇒ Object

Closes our librdkafka instance with the flush patch

Parameters:

  • timeout_ms (Integer) (defaults to: 5_000)

    flush timeout



76
77
78
79
80
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 76

def close(timeout_ms = 5_000)
  ObjectSpace.undefine_finalizer(self)

  @client.close(nil, timeout_ms)
end

#initialize(*args) ⇒ Object

Parameters:

  • args (Object)

    arguments accepted by the original rdkafka producer



18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 18

def initialize(*args)
  super

  @_partitions_count_cache = Concurrent::Hash.new do |cache, topic|
     = ::Rdkafka::Metadata.new(inner_kafka, topic).topics&.first

    cache[topic] = [
      monotonic_now,
       ? [:partition_count] : nil
    ]
  end
end

#inner_kafkaFFI::Pointer

Returns pointer to the raw librdkafka.

Returns:

  • (FFI::Pointer)

    pointer to the raw librdkafka



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 58

def inner_kafka
  unless @_inner_kafka
    version = ::Gem::Version.new(::Rdkafka::VERSION)

    if version < ::Gem::Version.new('0.12.0')
      @_inner_kafka = @native_kafka
    elsif version < ::Gem::Version.new('0.13.0.beta.1')
      @_inner_kafka = @client.native
    else
      @_inner_kafka = @native_kafka.inner
    end
  end

  @_inner_kafka
end

#nameString

Adds a method that allows us to get the native kafka producer name

In between rdkafka versions, there are internal changes that force us to add some extra magic to support all the versions.

Returns:

  • (String)

    producer instance name



37
38
39
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 37

def name
  @_name ||= ::Rdkafka::Bindings.rd_kafka_name(inner_kafka)
end

#partition_count(topic) ⇒ Integer

This patch makes sure we cache the partition count for a given topic for given time This prevents us in case someone uses partition_key from querying for the count with each message. Instead we query once every 30 seconds at most

Parameters:

  • topic (String)

    topic name

Returns:

  • (Integer)

    partition count for a given topic



47
48
49
50
51
52
53
54
55
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 47

def partition_count(topic)
  closed_producer_check(__method__)

  @_partitions_count_cache.delete_if do |_, cached|
    monotonic_now - cached.first > PARTITIONS_COUNT_TTL
  end

  @_partitions_count_cache[topic].last
end