Class: Karafka::Swarm::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/swarm/node.rb

Overview

Note:

Some of this APIs are for parent process only

Note:

Keep in mind this can be used in both forks and supervisor and has a slightly different role in each. In case of the supervisor it is used to get information about the child and make certain requests to it. In case of child, it is used to provide zombie-fencing and report liveness

Represents a single forked process node in a swarm Provides simple API to control forks and check their status

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, parent_pid) ⇒ Node

Returns a new instance of Node.

Parameters:

  • id (Integer)

    number of the fork. Used for uniqueness setup for group client ids and other stuff where we need to know a unique reference of the fork in regards to the rest of them.

  • parent_pid (Integer)

    parent pid for zombie fencing



34
35
36
37
# File 'lib/karafka/swarm/node.rb', line 34

def initialize(id, parent_pid)
  @id = id
  @parent_pidfd = Pidfd.new(parent_pid)
end

Instance Attribute Details

#idInteger (readonly)

Returns id of the node. Useful for client.group.id assignment.

Returns:

  • (Integer)

    id of the node. Useful for client.group.id assignment



25
26
27
# File 'lib/karafka/swarm/node.rb', line 25

def id
  @id
end

#pidInteger (readonly)

Returns pid of the node.

Returns:

  • (Integer)

    pid of the node



28
29
30
# File 'lib/karafka/swarm/node.rb', line 28

def pid
  @pid
end

Instance Method Details

#alive?Boolean

Note:

Parent API

Note:

Keep in mind that the fact that process is alive does not mean it is healthy

Returns true if node is alive or false if died.

Returns:

  • (Boolean)

    true if node is alive or false if died



118
119
120
# File 'lib/karafka/swarm/node.rb', line 118

def alive?
  @pidfd.alive?
end

#cleanupObject

Removes the dead process from the processes table



153
154
155
# File 'lib/karafka/swarm/node.rb', line 153

def cleanup
  @pidfd.cleanup
end

#healthyObject

Note:

Child API

Indicates that this node is doing well



85
86
87
# File 'lib/karafka/swarm/node.rb', line 85

def healthy
  write('0')
end

#orphaned?Boolean

Note:

Child API

Returns true if node is orphaned or false otherwise. Used for orphans detection.

Returns:

  • (Boolean)

    true if node is orphaned or false otherwise. Used for orphans detection.



124
125
126
# File 'lib/karafka/swarm/node.rb', line 124

def orphaned?
  !@parent_pidfd.alive?
end

#quietObject

Note:

Parent API

Sends sigtstp to the node



136
137
138
# File 'lib/karafka/swarm/node.rb', line 136

def quiet
  signal('TSTP')
end

#signal(signal) ⇒ Object

Sends provided signal to the node

Parameters:

  • signal (String)


148
149
150
# File 'lib/karafka/swarm/node.rb', line 148

def signal(signal)
  @pidfd.signal(signal)
end

#startObject

Note:

Parent API

Starts a new fork and: - stores pid and parent reference - makes sure reader pipe is closed - sets up liveness listener - recreates producer and web producer



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/karafka/swarm/node.rb', line 45

def start
  @reader, @writer = IO.pipe

  # :nocov:
  @pid = ::Process.fork do
    # Close the old producer so it is not a subject to GC
    # While it was not opened in the parent, without explicit closing, there still could be
    # an attempt to close it when finalized, meaning it would be kept in memory.
    config.producer.close

    # Supervisor producer is closed, hence we need a new one here
    config.producer = ::WaterDrop::Producer.new do |p_config|
      p_config.kafka = Setup::AttributesMap.producer(kafka.dup)
      p_config.logger = config.logger
    end

    @pid = ::Process.pid
    @reader.close

    # Indicate we are alive right after start
    healthy

    swarm.node = self
    monitor.subscribe(liveness_listener)
    monitor.instrument('swarm.node.after_fork', caller: self)

    Karafka::Process.tags.add(:execution_mode, 'mode:swarm')
    Server.execution_mode = :swarm
    Server.run

    @writer.close
  end
  # :nocov:

  @writer.close
  @pidfd = Pidfd.new(@pid)
end

#statusInteger

Note:

Parent API

Note:

If there were few issues reported, it will pick the one with highest number

Returns This returns following status code depending on the data: - -1 if node did not report anything new - 0 if all good, - positive number if there was a problem (indicates error code).

Returns:

  • (Integer)

    This returns following status code depending on the data: - -1 if node did not report anything new - 0 if all good, - positive number if there was a problem (indicates error code)



106
107
108
109
110
111
112
113
# File 'lib/karafka/swarm/node.rb', line 106

def status
  result = read

  return -1 if result.nil?
  return -1 if result == false

  result.split("\n").map(&:to_i).max
end

#stopObject

Note:

Parent API

Sends sigterm to the node



130
131
132
# File 'lib/karafka/swarm/node.rb', line 130

def stop
  signal('TERM')
end

#terminateObject

Note:

Parent API

Terminates node



142
143
144
# File 'lib/karafka/swarm/node.rb', line 142

def terminate
  signal('KILL')
end

#unhealthy(reason_code = '1') ⇒ Object

Note:

Child API

Note:

We convert this to string to normalize the API

Indicates, that this node has failed

Parameters:

  • reason_code (Integer, String) (defaults to: '1')

    numeric code we want to use to indicate that we are not healthy. Anything bigger than 0 will be considered not healthy. Useful it we want to have complex health-checking with reporting.



95
96
97
# File 'lib/karafka/swarm/node.rb', line 95

def unhealthy(reason_code = '1')
  write(reason_code.to_s)
end