Class: Karafka::Pro::Swarm::LivenessListener

Inherits:
Swarm::LivenessListener show all
Defined in:
lib/karafka/pro/swarm/liveness_listener.rb

Overview

Note:

This listener should not break anything if subscribed in the supervisor prior to forking as it relies on server events for operations.

Pro listener that monitors RSS usage and other heartbeat metrics (if configured) to ensure that everything operates.

It can: - monitor poll frequency to make sure things are not polled not often enough - monitor consumption to make sure we do not process data for too long - monitor RSS to make sure that we do not use too much memory

By default it does not monitor memory and consuming and polling is configured in such a way to align with max.poll.interval.ms and other defaults.

Failure statuses reported are as follows: - 1 - polling ttl exceeded - 2 - consuming ttl exceeded - 3 - memory limit exceeded

Instance Method Summary collapse

Constructor Details

#initialize(memory_limit: Float::INFINITY, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000) ⇒ LivenessListener

Note:

The default TTL matches the default max.poll.interval.ms

Returns a new instance of LivenessListener.

Parameters:

  • memory_limit (Integer) (defaults to: Float::INFINITY)

    max memory in MB for this process to be considered healthy

  • consuming_ttl (Integer) (defaults to: 5 * 60 * 1_000)

    time in ms after which we consider consumption hanging. It allows us to define max consumption time after which supervisor should consider given process as hanging

  • polling_ttl (Integer) (defaults to: 5 * 60 * 1_000)

    max time in ms for polling. If polling (any) does not happen that often, process should be considered dead.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 44

def initialize(
  memory_limit: Float::INFINITY,
  consuming_ttl: 5 * 60 * 1_000,
  polling_ttl: 5 * 60 * 1_000
)
  @polling_ttl = polling_ttl
  @consuming_ttl = consuming_ttl
  # We cast it just in case someone would provide '10MB' or something similar
  @memory_limit = memory_limit.is_a?(String) ? memory_limit.to_i : memory_limit
  @pollings = {}
  @consumptions = {}

  super()
end

Instance Method Details

#on_connection_listener_fetch_loop(_event) ⇒ Object

Tick on each fetch

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


62
63
64
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 62

def on_connection_listener_fetch_loop(_event)
  mark_polling_tick
end

#on_connection_listener_stopped(_event) ⇒ Object

Deregister the polling tracker for given listener

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


120
121
122
123
124
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 120

def on_connection_listener_stopped(_event)
  return if Karafka::App.done?

  clear_polling_tick
end

#on_connection_listener_stopping(_event) ⇒ Object

Deregister the polling tracker for given listener

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


108
109
110
111
112
113
114
115
116
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 108

def on_connection_listener_stopping(_event)
  # We are interested in disabling tracking for given listener only if it was requested
  # when karafka was running. If we would always clear, it would not catch the shutdown
  # polling requirements. The "running" listener shutdown operations happen only when
  # the manager requests it for downscaling.
  return if Karafka::App.done?

  clear_polling_tick
end

#on_error_occurred(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


88
89
90
91
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 88

def on_error_occurred(_event)
  clear_consumption_tick
  clear_polling_tick
end

#on_statistics_emitted(_event) ⇒ Object

Reports the current status once in a while

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


96
97
98
99
100
101
102
103
104
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 96

def on_statistics_emitted(_event)
  periodically do
    return unless node

    current_status = status

    current_status.positive? ? node.unhealthy(current_status) : node.healthy
  end
end