Class: WaterDrop::ConnectionPool

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/waterdrop/connection_pool.rb

Overview

Connection pool wrapper for WaterDrop producers using the proven connection_pool gem.

This provides a clean WaterDrop-specific API while leveraging the battle-tested, connection_pool gem underneath. The wrapper hides the direct usage of the connection_pool gem and provides WaterDrop-specific configuration.

Examples:

Basic usage

pool = WaterDrop::ConnectionPool.new(size: 10) do |config|
  config.kafka = { 'bootstrap.servers': 'localhost:9092' }
  config.deliver = true
end

pool.with do |producer|
  producer.produce_sync(topic: 'events', payload: 'data')
end

Transactional producers with unique IDs

pool = WaterDrop::ConnectionPool.new(size: 5) do |config, index|
  config.kafka = {
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': "my-app-#{index}"
  }
end

Global connection pool

WaterDrop::ConnectionPool.setup(size: 20) do |config|
  config.kafka = { 'bootstrap.servers': ENV['KAFKA_BROKERS'] }
end

WaterDrop::ConnectionPool.with do |producer|
  producer.produce_async(topic: 'events', payload: 'data')
end

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool

Creates a new WaterDrop connection pool

Parameters:

  • size (Integer) (defaults to: 5)

    Pool size (default: 5)

  • timeout (Numeric) (defaults to: 5000)

    Connection timeout in milliseconds (default: 5000)

  • producer_config (Proc)

    Block to configure each producer in the pool

Yields:

  • (config, index)

    Block to configure each producer in the pool, receives config and pool index



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/waterdrop/connection_pool.rb', line 201

def initialize(size: 5, timeout: 5000, &producer_config)
  self.class.send(:ensure_connection_pool_gem!)

  @producer_config = producer_config
  @pool_index = 0
  @pool_mutex = Mutex.new

  @pool = ::ConnectionPool.new(size: size, timeout: timeout / 1000.0) do
    producer_index = @pool_mutex.synchronize { @pool_index += 1 }

    WaterDrop::Producer.new do |config|
      if @producer_config.arity == 2
        @producer_config.call(config, producer_index)
      else
        @producer_config.call(config)
      end
    end
  end

  # Emit event when a connection pool is created
  WaterDrop.instrumentation.instrument(
    'connection_pool.created',
    pool: self,
    size: size,
    timeout: timeout
  )
end

Class Attribute Details

.default_poolObject

Global connection pool instance



45
46
47
# File 'lib/waterdrop/connection_pool.rb', line 45

def default_pool
  @default_pool
end

Instance Attribute Details

#pool::ConnectionPool (readonly)

Returns the underlying connection_pool instance This allows access to advanced connection_pool features if needed

Returns:

  • (::ConnectionPool)

    The underlying connection pool



293
294
295
# File 'lib/waterdrop/connection_pool.rb', line 293

def pool
  @pool
end

Class Method Details

.active?Boolean

Check if the global connection pool is active (configured)

Returns:

  • (Boolean)

    true if global pool is configured, false otherwise



150
151
152
# File 'lib/waterdrop/connection_pool.rb', line 150

def active?
  !@default_pool.nil?
end

.closeObject

Shutdown the global connection pool Alias for shutdown to align with producer API WaterDrop producers use #close, so we alias connection pool #shutdown to #close for API consistency across both individual producers and connection pools



132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/waterdrop/connection_pool.rb', line 132

def shutdown
  return unless @default_pool

  pool = @default_pool
  @default_pool.shutdown
  @default_pool = nil

  # Emit global event for pool shutdown
  WaterDrop.instrumentation.instrument(
    'connection_pool.shutdown',
    pool: pool
  )
end

.reloadObject

Reload the global connection pool



135
136
137
138
139
140
141
142
143
144
145
# File 'lib/waterdrop/connection_pool.rb', line 135

def reload
  return unless @default_pool

  @default_pool.reload

  # Emit global event for pool reload
  WaterDrop.instrumentation.instrument(
    'connection_pool.reload',
    pool: @default_pool
  )
end

.setup(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool

Sets up a global connection pool

Examples:

Basic setup

WaterDrop::ConnectionPool.setup(size: 15) do |config|
  config.kafka = { 'bootstrap.servers': ENV['KAFKA_BROKERS'] }
  config.deliver = true
end

Transactional setup with unique IDs

WaterDrop::ConnectionPool.setup(size: 5) do |config, index|
  config.kafka = {
    'bootstrap.servers': ENV['KAFKA_BROKERS'],
    'transactional.id': "my-app-#{index}"
  }
end

Parameters:

  • size (Integer) (defaults to: 5)

    Pool size (default: 5)

  • timeout (Numeric) (defaults to: 5000)

    Connection timeout in milliseconds (default: 5000)

  • producer_config (Proc)

    Block to configure each producer in the pool

Yields:

  • (config, index)

    Block to configure each producer in the pool, receives config and pool index

Returns:



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/waterdrop/connection_pool.rb', line 69

def setup(size: 5, timeout: 5000, &producer_config)
  ensure_connection_pool_gem!

  @default_pool = new(size: size, timeout: timeout, &producer_config)

  # Emit global event for pool setup
  WaterDrop.instrumentation.instrument(
    'connection_pool.setup',
    pool: @default_pool,
    size: size,
    timeout: timeout
  )

  @default_pool
end

.shutdownObject

Shutdown the global connection pool



115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/waterdrop/connection_pool.rb', line 115

def shutdown
  return unless @default_pool

  pool = @default_pool
  @default_pool.shutdown
  @default_pool = nil

  # Emit global event for pool shutdown
  WaterDrop.instrumentation.instrument(
    'connection_pool.shutdown',
    pool: pool
  )
end

.statsHash?

Get statistics about the global pool

Returns:

  • (Hash, nil)

    Pool statistics or nil if no global pool



105
106
107
108
109
110
111
112
# File 'lib/waterdrop/connection_pool.rb', line 105

def stats
  return nil unless @default_pool

  {
    size: @default_pool.size,
    available: @default_pool.available
  }
end

.transaction(&block) {|producer| ... } ⇒ Object

Execute a transaction with a producer from the global connection pool Only available when connection pool is configured

Examples:

WaterDrop::ConnectionPool.transaction do |producer|
  producer.produce(topic: 'events', payload: 'data1')
  producer.produce(topic: 'events', payload: 'data2')
end

Parameters:

  • block (Proc)

    Block to execute within a transaction

Yields:

  • (producer)

    Producer from the global pool with an active transaction

Returns:

  • (Object)

    Result of the block

Raises:

  • (RuntimeError)

    If no global pool is configured



167
168
169
170
171
# File 'lib/waterdrop/connection_pool.rb', line 167

def transaction(&block)
  raise 'No global connection pool configured. Call setup first.' unless @default_pool

  @default_pool.transaction(&block)
end

.with(&block) {|producer| ... } ⇒ Object

Executes a block with a producer from the global pool

Examples:

WaterDrop::ConnectionPool.with do |producer|
  producer.produce_sync(topic: 'events', payload: 'data')
end

Parameters:

  • block (Proc)

    Block to execute with a producer

Yields:

  • (producer)

    Producer from the global pool

Returns:

  • (Object)

    Result of the block

Raises:

  • (RuntimeError)

    If no global pool is configured



96
97
98
99
100
# File 'lib/waterdrop/connection_pool.rb', line 96

def with(&block)
  raise 'No global connection pool configured. Call setup first.' unless @default_pool

  @default_pool.with(&block)
end

Instance Method Details

#reloadObject

Reload all connections in the pool Useful for configuration changes or error recovery



259
260
261
262
263
264
265
266
267
268
269
# File 'lib/waterdrop/connection_pool.rb', line 259

def reload
  @pool.reload do |producer|
    producer.close! if producer&.status&.active?
  end

  # Emit event after pool is reloaded
  WaterDrop.instrumentation.instrument(
    'connection_pool.reloaded',
    pool: self
  )
end

#shutdownObject Also known as: close

Shutdown the connection pool



240
241
242
243
244
245
246
247
248
249
250
# File 'lib/waterdrop/connection_pool.rb', line 240

def shutdown
  @pool.shutdown do |producer|
    producer.close! if producer&.status&.active?
  end

  # Emit event after pool is shut down
  WaterDrop.instrumentation.instrument(
    'connection_pool.shutdown',
    pool: self
  )
end

#statsHash

Get pool statistics

Returns:

  • (Hash)

    Pool statistics



232
233
234
235
236
237
# File 'lib/waterdrop/connection_pool.rb', line 232

def stats
  {
    size: @pool.size,
    available: @pool.available
  }
end

#transaction {|producer| ... } ⇒ Object

Execute a transaction with a producer from this connection pool

Examples:

pool.transaction do |producer|
  producer.produce(topic: 'events', payload: 'data1')
  producer.produce(topic: 'events', payload: 'data2')
end

Yields:

  • (producer)

    Producer from the pool with an active transaction

Returns:

  • (Object)

    Result of the block



281
282
283
284
285
286
287
# File 'lib/waterdrop/connection_pool.rb', line 281

def transaction
  with do |producer|
    producer.transaction do
      yield(producer)
    end
  end
end