Class: Karafka::Swarm::Manager
- Inherits:
-
Object
- Object
- Karafka::Swarm::Manager
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/swarm/manager.rb
Overview
This is intended to run in the supervisor under mutexes (when needed)
Manager similar to the one for threads but managing processing nodes It starts nodes and keeps an eye on them.
In any of the nodes is misbehaving (based on liveness listener) it will be restarted. Initially gracefully but if won’t stop itself, it will be forced to.
Instance Attribute Summary collapse
-
#nodes ⇒ Array<Node>
readonly
All nodes that manager manages.
Instance Method Summary collapse
-
#cleanup ⇒ Object
Collects all processes statuses.
-
#control ⇒ Object
Checks on nodes if they are ok one after another.
-
#initialize ⇒ Manager
constructor
Initializes the swarm manager with empty nodes.
-
#quiet ⇒ Object
Attempts to quiet all the nodes.
-
#signal(signal) ⇒ Object
Sends given signal to all nodes.
-
#start ⇒ Object
Starts all the expected nodes for the first time.
-
#stop ⇒ Object
Attempts to stop all the nodes.
-
#stopped? ⇒ Boolean
True if none of the nodes is running.
-
#terminate ⇒ Object
Terminates all the nodes.
Constructor Details
#initialize ⇒ Manager
Initializes the swarm manager with empty nodes
33 34 35 36 |
# File 'lib/karafka/swarm/manager.rb', line 33 def initialize @nodes = [] @statuses = Hash.new { |h, k| h[k] = {} } end |
Instance Attribute Details
#nodes ⇒ Array<Node> (readonly)
Returns All nodes that manager manages.
30 31 32 |
# File 'lib/karafka/swarm/manager.rb', line 30 def nodes @nodes end |
Instance Method Details
#cleanup ⇒ Object
Collects all processes statuses
63 64 65 |
# File 'lib/karafka/swarm/manager.rb', line 63 def cleanup @nodes.each(&:cleanup) end |
#control ⇒ Object
Checks on nodes if they are ok one after another
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/karafka/swarm/manager.rb', line 79 def control monitor.instrument('swarm.manager.control', caller: self) do @nodes.each do |node| statuses = @statuses[node] if node.alive? next if terminate_if_hanging(statuses, node) next if stop_if_not_healthy(statuses, node) next if stop_if_not_responding(statuses, node) else next if cleanup_one(statuses, node) next if restart_after_timeout(statuses, node) end end end end |
#quiet ⇒ Object
Attempts to quiet all the nodes
48 49 50 |
# File 'lib/karafka/swarm/manager.rb', line 48 def quiet @nodes.each(&:quiet) end |
#signal(signal) ⇒ Object
Sends given signal to all nodes
69 70 71 |
# File 'lib/karafka/swarm/manager.rb', line 69 def signal(signal) @nodes.each { |node| node.signal(signal) } end |
#start ⇒ Object
Starts all the expected nodes for the first time
39 40 41 42 43 44 45 |
# File 'lib/karafka/swarm/manager.rb', line 39 def start parent_pid = ::Process.pid @nodes = Array.new(nodes_count) do |i| start_one Node.new(i, parent_pid) end end |
#stop ⇒ Object
Attempts to stop all the nodes
53 54 55 |
# File 'lib/karafka/swarm/manager.rb', line 53 def stop @nodes.each(&:stop) end |
#stopped? ⇒ Boolean
Returns true if none of the nodes is running.
74 75 76 |
# File 'lib/karafka/swarm/manager.rb', line 74 def stopped? @nodes.none?(&:alive?) end |
#terminate ⇒ Object
Terminates all the nodes
58 59 60 |
# File 'lib/karafka/swarm/manager.rb', line 58 def terminate @nodes.each(&:terminate) end |