Module: Karafka::Core::Patches::Rdkafka::Bindings

Defined in:
lib/karafka/core/patches/rdkafka/bindings.rb

Overview

Extends Rdkafka::Bindings with some extra methods and updates callbacks that we intend to work with in a bit different way than rdkafka itself

Class Method Summary collapse

Class Method Details

.build_error_callbackFFI::Function

Returns overwritten callback function.

Returns:

  • (FFI::Function)

    overwritten callback function



24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/karafka/core/patches/rdkafka/bindings.rb', line 24

def build_error_callback
  FFI::Function.new(
    :void, %i[pointer int string pointer]
  ) do |client_prr, err_code, reason, _opaque|
    return nil unless ::Rdkafka::Config.error_callback

    name = ::Rdkafka::Bindings.rd_kafka_name(client_prr)

    error = ::Rdkafka::RdkafkaError.new(err_code, broker_message: reason)
    error.set_backtrace(caller)

    ::Rdkafka::Config.error_callback.call(name, error)
  end
end

.included(mod) ⇒ Object

Add extra methods that we need

Parameters:

  • mod (::Rdkafka::Bindings)

    rdkafka bindings module



15
16
17
18
19
20
21
# File 'lib/karafka/core/patches/rdkafka/bindings.rb', line 15

def included(mod)
  # Default rdkafka setup for errors doest not propagate client details, thus it always
  # publishes all the stuff for all rdkafka instances. We change that by providing
  # function that fetches the instance name, allowing us to have better notifications
  mod.send(:remove_const, :ErrorCallback)
  mod.const_set(:ErrorCallback, build_error_callback)
end