Class: WaterDrop::Instrumentation::IdleDisconnectorListener

Inherits:
Object
  • Object
show all
Includes:
Karafka::Core::Helpers::Time
Defined in:
lib/waterdrop/instrumentation/idle_disconnector_listener.rb

Overview

Note:

We do not have to worry about the running transactions or buffer being used because the disconnect is graceful and will not disconnect unless it is allowed to. This is why we can simplify things and take interest only in txmsgs.

Note:

For convenience, WaterDrop provides a config shortcut. Instead of manually subscribing this listener, you can simply set config.idle_disconnect_timeout in your producer config.

Idle disconnector listener that monitors producer activity and automatically disconnects idle producers to preserve TCP connections

This listener subscribes to statistics.emitted events and tracks the txmsgs (transmitted messages) count. If the producer doesn’t send any messages for a configurable timeout period, it will automatically disconnect the producer.

Examples:

Using config shortcut (recommended)

WaterDrop::Producer.new do |config|
  config.idle_disconnect_timeout = 5 * 60 * 1000 # 5 minutes
end

Manual listener usage with 5 minute timeout

producer.monitor.subscribe(
  WaterDrop::Instrumentation::IdleDisconnectorListener.new(
    producer,
    disconnect_timeout: 5 * 60 * 1000)
)

Usage with custom timeout

idle_disconnector = WaterDrop::Instrumentation::IdleDisconnectorListener.new(
  producer,
  disconnect_timeout: 10 * 60 * 1000
)
producer.monitor.subscribe(idle_disconnector)

Instance Method Summary collapse

Constructor Details

#initialize(producer, disconnect_timeout: 5 * 60 * 1_000) ⇒ IdleDisconnectorListener

Returns a new instance of IdleDisconnectorListener.

Parameters:

  • producer (WaterDrop::Producer)

    the producer instance to monitor

  • disconnect_timeout (Integer) (defaults to: 5 * 60 * 1_000)

    timeout in milliseconds before disconnecting (default: 5 minutes). Be aware that if you set it to a value lower than statistics publishing interval (5 seconds by default) it may be to aggressive in closing



44
45
46
47
48
49
50
51
# File 'lib/waterdrop/instrumentation/idle_disconnector_listener.rb', line 44

def initialize(producer, disconnect_timeout: 5 * 60 * 1_000)
  @producer = producer
  @disconnect_timeout = disconnect_timeout
  # We set this initially to -1 so any statistics change triggers a change to prevent an
  # early shutdown
  @last_txmsgs = -1
  @last_activity_time = monotonic_now
end

Instance Method Details

#on_statistics_emitted(event) ⇒ Object

This method is called automatically when the listener is subscribed to the monitor using producer.monitor.subscribe(listener_instance)

Parameters:

  • event (Hash)

    the statistics event containing producer statistics



57
58
59
# File 'lib/waterdrop/instrumentation/idle_disconnector_listener.rb', line 57

def on_statistics_emitted(event)
  call(event[:statistics])
end