Module: WaterDrop::Clients::Rdkafka
- Defined in:
- lib/waterdrop/clients/rdkafka.rb
Overview
Default Rdkafka client. Since we use the ::Rdkafka::Producer under the hood, this is just a module that aligns with client building API for the convenience.
Class Method Summary collapse
Class Method Details
.new(producer) ⇒ Object
Note:
We overwrite this that way, because we do not care
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/waterdrop/clients/rdkafka.rb', line 13 def new(producer) kafka_config = producer.config.kafka.to_h monitor = producer.config.monitor client = ::Rdkafka::Config.new(kafka_config).producer(native_kafka_auto_start: false) # Register statistics runner for this particular type of callbacks ::Karafka::Core::Instrumentation.statistics_callbacks.add( producer.id, Instrumentation::Callbacks::Statistics.new(producer.id, client.name, monitor) ) # Register error tracking callback ::Karafka::Core::Instrumentation.error_callbacks.add( producer.id, Instrumentation::Callbacks::Error.new(producer.id, client.name, monitor) ) # Register oauth bearer refresh for this particular type of callbacks ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.add( producer.id, Instrumentation::Callbacks::OauthbearerTokenRefresh.new(client, monitor) ) # This callback is not global and is per client, thus we do not have to wrap it with a # callbacks manager to make it work client.delivery_callback = Instrumentation::Callbacks::Delivery.new( producer.id, producer.transactional?, monitor ) oauth_listener = producer.config.oauth.token_provider_listener # We need to subscribe the oauth listener here because we want it to be ready before # any producer callbacks run. In theory because WaterDrop rdkafka producer is lazy loaded # we would have enough time to make user subscribe it himself, but then it would not # coop with auto-configuration coming from Karafka. The way it is done below, if it is # configured it will be subscribed and if not, user always can subscribe it himself as # long as it is done prior to first usage monitor.subscribe(oauth_listener) if oauth_listener client.start # Switch to the transactional mode if user provided the transactional id client.init_transactions if kafka_config.key?(:'transactional.id') client end |