Module: WaterDrop::Clients::Rdkafka

Defined in:
lib/waterdrop/clients/rdkafka.rb

Overview

Default Rdkafka client. Since we use the ::Rdkafka::Producer under the hood, this is just a module that aligns with client building API for the convenience.

Class Method Summary collapse

Class Method Details

.new(producer) ⇒ Object

Note:

We overwrite this that way, because we do not care

Parameters:



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/waterdrop/clients/rdkafka.rb', line 13

def new(producer)
  kafka_config = producer.config.kafka.to_h
  monitor = producer.config.monitor

  client = ::Rdkafka::Config.new(kafka_config).producer(native_kafka_auto_start: false)

  # Register statistics runner for this particular type of callbacks
  ::Karafka::Core::Instrumentation.statistics_callbacks.add(
    producer.id,
    Instrumentation::Callbacks::Statistics.new(producer.id, client.name, monitor)
  )

  # Register error tracking callback
  ::Karafka::Core::Instrumentation.error_callbacks.add(
    producer.id,
    Instrumentation::Callbacks::Error.new(producer.id, client.name, monitor)
  )

  # Register oauth bearer refresh for this particular type of callbacks
  ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.add(
    producer.id,
    Instrumentation::Callbacks::OauthbearerTokenRefresh.new(client, monitor)
  )

  # This callback is not global and is per client, thus we do not have to wrap it with a
  # callbacks manager to make it work
  client.delivery_callback = Instrumentation::Callbacks::Delivery.new(
    producer.id,
    producer.transactional?,
    monitor
  )

  oauth_listener = producer.config.oauth.token_provider_listener
  # We need to subscribe the oauth listener here because we want it to be ready before
  # any producer callbacks run. In theory because WaterDrop rdkafka producer is lazy loaded
  # we would have enough time to make user subscribe it himself, but then it would not
  # coop with auto-configuration coming from Karafka. The way it is done below, if it is
  # configured it will be subscribed and if not, user always can subscribe it himself as
  # long as it is done prior to first usage
  monitor.subscribe(oauth_listener) if oauth_listener

  client.start

  # Switch to the transactional mode if user provided the transactional id
  client.init_transactions if kafka_config.key?(:'transactional.id')

  client
end