Class: Karafka::Pro::Swarm::LivenessListener
- Inherits:
-
Swarm::LivenessListener
- Object
- Swarm::LivenessListener
- Karafka::Pro::Swarm::LivenessListener
- Defined in:
- lib/karafka/pro/swarm/liveness_listener.rb
Overview
This listener should not break anything if subscribed in the supervisor prior to forking as it relies on server events for operations.
Pro listener that monitors RSS usage and other heartbeat metrics (if configured) to ensure that everything operates.
It can: - monitor poll frequency to make sure things are not polled not often enough - monitor consumption to make sure we do not process data for too long - monitor RSS to make sure that we do not use too much memory
By default it does not monitor memory and consuming and polling is configured in such a way to align with max.poll.interval.ms
and other defaults.
Failure statuses reported are as follows: - 1 - polling ttl exceeded - 2 - consuming ttl exceeded - 3 - memory limit exceeded
Instance Method Summary collapse
-
#initialize(memory_limit: Float::INFINITY, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000) ⇒ LivenessListener
constructor
A new instance of LivenessListener.
-
#on_connection_listener_fetch_loop(_event) ⇒ Object
Tick on each fetch.
-
#on_connection_listener_stopped(_event) ⇒ Object
Deregister the polling tracker for given listener.
-
#on_connection_listener_stopping(_event) ⇒ Object
Deregister the polling tracker for given listener.
- #on_error_occurred(_event) ⇒ Object
-
#on_statistics_emitted(_event) ⇒ Object
Reports the current status once in a while.
Constructor Details
#initialize(memory_limit: Float::INFINITY, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000) ⇒ LivenessListener
The default TTL matches the default max.poll.interval.ms
Returns a new instance of LivenessListener.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 36 def initialize( memory_limit: Float::INFINITY, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000 ) @polling_ttl = polling_ttl @consuming_ttl = consuming_ttl # We cast it just in case someone would provide '10MB' or something similar @memory_limit = memory_limit.is_a?(String) ? memory_limit.to_i : memory_limit @pollings = {} @consumptions = {} super() end |
Instance Method Details
#on_connection_listener_fetch_loop(_event) ⇒ Object
Tick on each fetch
54 55 56 |
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 54 def on_connection_listener_fetch_loop(_event) mark_polling_tick end |
#on_connection_listener_stopped(_event) ⇒ Object
Deregister the polling tracker for given listener
112 113 114 115 116 |
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 112 def on_connection_listener_stopped(_event) return if Karafka::App.done? clear_polling_tick end |
#on_connection_listener_stopping(_event) ⇒ Object
Deregister the polling tracker for given listener
100 101 102 103 104 105 106 107 108 |
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 100 def on_connection_listener_stopping(_event) # We are interested in disabling tracking for given listener only if it was requested # when karafka was running. If we would always clear, it would not catch the shutdown # polling requirements. The "running" listener shutdown operations happen only when # the manager requests it for downscaling. return if Karafka::App.done? clear_polling_tick end |
#on_error_occurred(_event) ⇒ Object
80 81 82 83 |
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 80 def on_error_occurred(_event) clear_consumption_tick clear_polling_tick end |
#on_statistics_emitted(_event) ⇒ Object
Reports the current status once in a while
88 89 90 91 92 93 94 95 96 |
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 88 def on_statistics_emitted(_event) periodically do return unless node current_status = status current_status.positive? ? node.unhealthy(current_status) : node.healthy end end |