Module: WaterDrop::Patches::Rdkafka::Producer

Includes:
Karafka::Core::Helpers::Time
Defined in:
lib/waterdrop/patches/rdkafka/producer.rb

Overview

Rdkafka::Producer patches

Instance Method Summary collapse

Instance Method Details

#close(timeout_ms = 5_000) ⇒ Object

Closes our librdkafka instance with the flush patch

Parameters:

  • timeout_ms (Integer) (defaults to: 5_000)

    flush timeout

[View source]

76
77
78
79
80
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 76

def close(timeout_ms = 5_000)
  ObjectSpace.undefine_finalizer(self)

  @client.close(nil, timeout_ms)
end

#initialize(*args) ⇒ Object

Parameters:

  • args (Object)

    arguments accepted by the original rdkafka producer

[View source]

18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 18

def initialize(*args)
  super

  @_partitions_count_cache = Concurrent::Hash.new do |cache, topic|
     = ::Rdkafka::Metadata.new(inner_kafka, topic).topics&.first

    cache[topic] = [
      monotonic_now,
       ? [:partition_count] : nil
    ]
  end
end

#inner_kafkaFFI::Pointer

Returns pointer to the raw librdkafka.

Returns:

  • (FFI::Pointer)

    pointer to the raw librdkafka

[View source]

58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 58

def inner_kafka
  unless @_inner_kafka
    version = ::Gem::Version.new(::Rdkafka::VERSION)

    if version < ::Gem::Version.new('0.12.0')
      @_inner_kafka = @native_kafka
    elsif version < ::Gem::Version.new('0.13.0.beta.1')
      @_inner_kafka = @client.native
    else
      @_inner_kafka = @native_kafka.inner
    end
  end

  @_inner_kafka
end

#nameString

Adds a method that allows us to get the native kafka producer name

In between rdkafka versions, there are internal changes that force us to add some extra magic to support all the versions.

Returns:

  • (String)

    producer instance name

[View source]

37
38
39
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 37

def name
  @_name ||= ::Rdkafka::Bindings.rd_kafka_name(inner_kafka)
end

#partition_count(topic) ⇒ Integer

This patch makes sure we cache the partition count for a given topic for given time This prevents us in case someone uses partition_key from querying for the count with each message. Instead we query once every 30 seconds at most

Parameters:

  • topic (String)

    topic name

Returns:

  • (Integer)

    partition count for a given topic

[View source]

47
48
49
50
51
52
53
54
55
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 47

def partition_count(topic)
  closed_producer_check(__method__)

  @_partitions_count_cache.delete_if do |_, cached|
    monotonic_now - cached.first > PARTITIONS_COUNT_TTL
  end

  @_partitions_count_cache[topic].last
end