Module: WaterDrop::Patches::Rdkafka::Client

Defined in:
lib/waterdrop/patches/rdkafka/client.rb

Overview

Patches for the producer client

Instance Method Summary collapse

Instance Method Details

#close(_object_id = nil, timeout_ms = 5_000) ⇒ Object

Parameters:

  • _object_id (nil) (defaults to: nil)

    rdkafka API compatibility argument

  • timeout_ms (Integer) (defaults to: 5_000)

    final flush timeout in ms



10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/waterdrop/patches/rdkafka/client.rb', line 10

def close(_object_id = nil, timeout_ms = 5_000)
  return unless @native

  # Indicate to polling thread that we're closing
  @polling_thread[:closing] = true
  # Wait for the polling thread to finish up
  @polling_thread.join

  ::Rdkafka::Bindings.rd_kafka_flush(@native, timeout_ms)
  ::Rdkafka::Bindings.rd_kafka_destroy(@native)

  @native = nil
end