Module: WaterDrop

Defined in:
lib/waterdrop.rb,
lib/waterdrop/config.rb,
lib/waterdrop/errors.rb,
lib/waterdrop/version.rb,
lib/waterdrop/producer.rb,
lib/waterdrop/contracts.rb,
lib/waterdrop/middleware.rb,
lib/waterdrop/clients/dummy.rb,
lib/waterdrop/producer/sync.rb,
lib/waterdrop/producer/async.rb,
lib/waterdrop/clients/rdkafka.rb,
lib/waterdrop/connection_pool.rb,
lib/waterdrop/helpers/counter.rb,
lib/waterdrop/producer/buffer.rb,
lib/waterdrop/producer/status.rb,
lib/waterdrop/clients/buffered.rb,
lib/waterdrop/contracts/config.rb,
lib/waterdrop/producer/builder.rb,
lib/waterdrop/producer/variant.rb,
lib/waterdrop/contracts/message.rb,
lib/waterdrop/contracts/variant.rb,
lib/waterdrop/producer/transactions.rb,
lib/waterdrop/producer/class_monitor.rb,
lib/waterdrop/instrumentation/monitor.rb,
lib/waterdrop/instrumentation/class_monitor.rb,
lib/waterdrop/instrumentation/notifications.rb,
lib/waterdrop/contracts/transactional_offset.rb,
lib/waterdrop/instrumentation/callbacks/error.rb,
lib/waterdrop/instrumentation/logger_listener.rb,
lib/waterdrop/instrumentation/callbacks/delivery.rb,
lib/waterdrop/instrumentation/class_notifications.rb,
lib/waterdrop/instrumentation/callbacks/statistics.rb,
lib/waterdrop/instrumentation/idle_disconnector_listener.rb,
lib/waterdrop/instrumentation/vendors/datadog/metrics_listener.rb,
lib/waterdrop/instrumentation/callbacks/oauthbearer_token_refresh.rb

Overview

WaterDrop library

Defined Under Namespace

Modules: Clients, Contracts, Errors, Helpers, Instrumentation Classes: Config, ConnectionPool, Middleware, Producer

Constant Summary collapse

AbortTransaction =

Alias so we can have a nicer API to abort transactions This makes referencing easier

Errors::AbortTransaction
VERSION =

Current WaterDrop version

'2.8.11'

Class Method Summary collapse

Class Method Details

.gem_rootString

Returns root path of this gem.

Returns:

  • (String)

    root path of this gem



16
17
18
# File 'lib/waterdrop.rb', line 16

def gem_root
  Pathname.new(File.expand_path('..', __dir__))
end

.monitorWaterDrop::Instrumentation::ClassMonitor Also known as: instrumentation

Note:

Only supports class-level events (producer.created, producer.configured), not instance events

Returns global monitor for class-level event subscriptions. This allows external libraries to subscribe to WaterDrop lifecycle events without needing producer instance references.

Examples:

Subscribe to producer creation events

WaterDrop.monitor.subscribe('producer.created') do |event|
  producer = event[:producer]
  # Configure producer or add middleware
end

Returns:

  • (WaterDrop::Instrumentation::ClassMonitor)

    global monitor for class-level event subscriptions. This allows external libraries to subscribe to WaterDrop lifecycle events without needing producer instance references.



32
33
34
# File 'lib/waterdrop.rb', line 32

def monitor
  @instrumentation ||= Instrumentation::ClassMonitor.new
end

.poolWaterDrop::ConnectionPool

Access the global connection pool

Examples:

WaterDrop.pool.with do |producer|
  producer.produce_async(topic: 'events', payload: 'data')
end

Returns:



337
338
339
# File 'lib/waterdrop/connection_pool.rb', line 337

def pool
  ConnectionPool.default_pool
end

.transaction(&block) {|producer| ... } ⇒ Object

Execute a transaction with a producer from the global connection pool Only available when connection pool is configured

Examples:

WaterDrop.transaction do |producer|
  producer.produce(topic: 'events', payload: 'data1')
  producer.produce(topic: 'events', payload: 'data2')
end

Parameters:

  • block (Proc)

    Block to execute within a transaction

Yields:

  • (producer)

    Producer from the global pool with an active transaction

Returns:

  • (Object)

    Result of the block



325
326
327
# File 'lib/waterdrop/connection_pool.rb', line 325

def transaction(&block)
  ConnectionPool.transaction(&block)
end

.with(&block) {|producer| ... } ⇒ Object

Execute a block with a producer from the global connection pool Only available when connection pool is configured

Examples:

WaterDrop.with do |producer|
  producer.produce_sync(topic: 'events', payload: 'data')
end

Parameters:

  • block (Proc)

    Block to execute with a producer

Yields:

  • (producer)

    Producer from the global pool

Returns:

  • (Object)

    Result of the block



309
310
311
# File 'lib/waterdrop/connection_pool.rb', line 309

def with(&block)
  ConnectionPool.with(&block)
end