Module: WaterDrop::Patches::Rdkafka::Producer
- Includes:
- Karafka::Core::Helpers::Time
- Defined in:
- lib/waterdrop/patches/rdkafka/producer.rb
Overview
Rdkafka::Producer patches
Instance Method Summary collapse
-
#close(timeout_ms = 5_000) ⇒ Object
Closes our librdkafka instance with the flush patch.
- #initialize(*args) ⇒ Object
-
#inner_kafka ⇒ FFI::Pointer
Pointer to the raw librdkafka.
-
#name ⇒ String
Adds a method that allows us to get the native kafka producer name.
-
#partition_count(topic) ⇒ Integer
This patch makes sure we cache the partition count for a given topic for given time This prevents us in case someone uses
partition_key
from querying for the count with each message.
Instance Method Details
#close(timeout_ms = 5_000) ⇒ Object
Closes our librdkafka instance with the flush patch
76 77 78 79 80 |
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 76 def close(timeout_ms = 5_000) ObjectSpace.undefine_finalizer(self) @client.close(nil, timeout_ms) end |
#initialize(*args) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 18 def initialize(*args) super @_partitions_count_cache = Concurrent::Hash.new do |cache, topic| = ::Rdkafka::Metadata.new(inner_kafka, topic).topics&.first cache[topic] = [ monotonic_now, ? [:partition_count] : nil ] end end |
#inner_kafka ⇒ FFI::Pointer
Returns pointer to the raw librdkafka.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 58 def inner_kafka unless @_inner_kafka version = ::Gem::Version.new(::Rdkafka::VERSION) if version < ::Gem::Version.new('0.12.0') @_inner_kafka = @native_kafka elsif version < ::Gem::Version.new('0.13.0.beta.1') @_inner_kafka = @client.native else @_inner_kafka = @native_kafka.inner end end @_inner_kafka end |
#name ⇒ String
Adds a method that allows us to get the native kafka producer name
In between rdkafka versions, there are internal changes that force us to add some extra magic to support all the versions.
37 38 39 |
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 37 def name @_name ||= ::Rdkafka::Bindings.rd_kafka_name(inner_kafka) end |
#partition_count(topic) ⇒ Integer
This patch makes sure we cache the partition count for a given topic for given time This prevents us in case someone uses partition_key
from querying for the count with each message. Instead we query once every 30 seconds at most
47 48 49 50 51 52 53 54 55 |
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 47 def partition_count(topic) closed_producer_check(__method__) @_partitions_count_cache.delete_if do |_, cached| monotonic_now - cached.first > PARTITIONS_COUNT_TTL end @_partitions_count_cache[topic].last end |