Class: Karafka::Swarm::Manager

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/swarm/manager.rb

Overview

Note:

This is intended to run in the supervisor under mutexes (when needed)

Manager similar to the one for threads but managing processing nodes It starts nodes and keeps an eye on them.

In any of the nodes is misbehaving (based on liveness listener) it will be restarted. Initially gracefully but if won’t stop itself, it will be forced to.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeManager

Returns a new instance of Manager.



32
33
34
35
# File 'lib/karafka/swarm/manager.rb', line 32

def initialize
  @nodes = []
  @statuses = Hash.new { |h, k| h[k] = {} }
end

Instance Attribute Details

#nodesArray<Node> (readonly)

Returns All nodes that manager manages.

Returns:

  • (Array<Node>)

    All nodes that manager manages



30
31
32
# File 'lib/karafka/swarm/manager.rb', line 30

def nodes
  @nodes
end

Instance Method Details

#cleanupObject

Collects all processes statuses



62
63
64
# File 'lib/karafka/swarm/manager.rb', line 62

def cleanup
  @nodes.each(&:cleanup)
end

#controlObject

Checks on nodes if they are ok one after another



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/karafka/swarm/manager.rb', line 78

def control
  monitor.instrument('swarm.manager.control', caller: self) do
    @nodes.each do |node|
      statuses = @statuses[node]

      if node.alive?
        next if terminate_if_hanging(statuses, node)
        next if stop_if_not_healthy(statuses, node)
        next if stop_if_not_responding(statuses, node)
      else
        next if cleanup_one(statuses, node)
        next if restart_after_timeout(statuses, node)
      end
    end
  end
end

#quietObject

Attempts to quiet all the nodes



47
48
49
# File 'lib/karafka/swarm/manager.rb', line 47

def quiet
  @nodes.each(&:quiet)
end

#signal(signal) ⇒ Object

Sends given signal to all nodes

Parameters:

  • signal (String)

    signal name



68
69
70
# File 'lib/karafka/swarm/manager.rb', line 68

def signal(signal)
  @nodes.each { |node| node.signal(signal) }
end

#startObject

Starts all the expected nodes for the first time



38
39
40
41
42
43
44
# File 'lib/karafka/swarm/manager.rb', line 38

def start
  parent_pid = ::Process.pid

  @nodes = Array.new(nodes_count) do |i|
    start_one Node.new(i, parent_pid)
  end
end

#stopObject

Attempts to stop all the nodes



52
53
54
# File 'lib/karafka/swarm/manager.rb', line 52

def stop
  @nodes.each(&:stop)
end

#stopped?Boolean

Returns true if none of the nodes is running.

Returns:

  • (Boolean)

    true if none of the nodes is running



73
74
75
# File 'lib/karafka/swarm/manager.rb', line 73

def stopped?
  @nodes.none?(&:alive?)
end

#terminateObject

Terminates all the nodes



57
58
59
# File 'lib/karafka/swarm/manager.rb', line 57

def terminate
  @nodes.each(&:terminate)
end