Class: Karafka::Swarm::Manager
- Inherits:
-
Object
- Object
- Karafka::Swarm::Manager
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/swarm/manager.rb
Overview
This is intended to run in the supervisor under mutexes (when needed)
Manager similar to the one for threads but managing processing nodes It starts nodes and keeps an eye on them.
In any of the nodes is misbehaving (based on liveness listener) it will be restarted. Initially gracefully but if won’t stop itself, it will be forced to.
Instance Attribute Summary collapse
-
#nodes ⇒ Array<Node>
readonly
All nodes that manager manages.
Instance Method Summary collapse
-
#cleanup ⇒ Object
Collects all processes statuses.
-
#control ⇒ Object
Checks on nodes if they are ok one after another.
-
#initialize ⇒ Manager
constructor
A new instance of Manager.
-
#quiet ⇒ Object
Attempts to quiet all the nodes.
-
#signal(signal) ⇒ Object
Sends given signal to all nodes.
-
#start ⇒ Object
Starts all the expected nodes for the first time.
-
#stop ⇒ Object
Attempts to stop all the nodes.
-
#stopped? ⇒ Boolean
True if none of the nodes is running.
-
#terminate ⇒ Object
Terminates all the nodes.
Constructor Details
#initialize ⇒ Manager
Returns a new instance of Manager.
32 33 34 35 |
# File 'lib/karafka/swarm/manager.rb', line 32 def initialize @nodes = [] @statuses = Hash.new { |h, k| h[k] = {} } end |
Instance Attribute Details
#nodes ⇒ Array<Node> (readonly)
Returns All nodes that manager manages.
30 31 32 |
# File 'lib/karafka/swarm/manager.rb', line 30 def nodes @nodes end |
Instance Method Details
#cleanup ⇒ Object
Collects all processes statuses
62 63 64 |
# File 'lib/karafka/swarm/manager.rb', line 62 def cleanup @nodes.each(&:cleanup) end |
#control ⇒ Object
Checks on nodes if they are ok one after another
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/karafka/swarm/manager.rb', line 78 def control monitor.instrument('swarm.manager.control', caller: self) do @nodes.each do |node| statuses = @statuses[node] if node.alive? next if terminate_if_hanging(statuses, node) next if stop_if_not_healthy(statuses, node) next if stop_if_not_responding(statuses, node) else next if cleanup_one(statuses, node) next if restart_after_timeout(statuses, node) end end end end |
#quiet ⇒ Object
Attempts to quiet all the nodes
47 48 49 |
# File 'lib/karafka/swarm/manager.rb', line 47 def quiet @nodes.each(&:quiet) end |
#signal(signal) ⇒ Object
Sends given signal to all nodes
68 69 70 |
# File 'lib/karafka/swarm/manager.rb', line 68 def signal(signal) @nodes.each { |node| node.signal(signal) } end |
#start ⇒ Object
Starts all the expected nodes for the first time
38 39 40 41 42 43 44 |
# File 'lib/karafka/swarm/manager.rb', line 38 def start parent_pid = ::Process.pid @nodes = Array.new(nodes_count) do |i| start_one Node.new(i, parent_pid) end end |
#stop ⇒ Object
Attempts to stop all the nodes
52 53 54 |
# File 'lib/karafka/swarm/manager.rb', line 52 def stop @nodes.each(&:stop) end |
#stopped? ⇒ Boolean
Returns true if none of the nodes is running.
73 74 75 |
# File 'lib/karafka/swarm/manager.rb', line 73 def stopped? @nodes.none?(&:alive?) end |
#terminate ⇒ Object
Terminates all the nodes
57 58 59 |
# File 'lib/karafka/swarm/manager.rb', line 57 def terminate @nodes.each(&:terminate) end |