Class: Karafka::Swarm::Node
- Inherits:
-
Object
- Object
- Karafka::Swarm::Node
- Defined in:
- lib/karafka/swarm/node.rb
Overview
Some of this APIs are for parent process only
Keep in mind this can be used in both forks and supervisor and has a slightly different role in each. In case of the supervisor it is used to get information about the child and make certain requests to it. In case of child, it is used to provide zombie-fencing and report liveness
Represents a single forked process node in a swarm Provides simple API to control forks and check their status
Instance Attribute Summary collapse
-
#id ⇒ Integer
readonly
Id of the node.
-
#pid ⇒ Integer
readonly
Pid of the node.
Instance Method Summary collapse
-
#alive? ⇒ Boolean
True if node is alive or false if died.
-
#cleanup ⇒ Boolean
Removes the dead process from the processes table.
-
#healthy ⇒ Object
Indicates that this node is doing well.
-
#initialize(id, parent_pid) ⇒ Node
constructor
A new instance of Node.
-
#orphaned? ⇒ Boolean
True if node is orphaned or false otherwise.
-
#quiet ⇒ Object
Sends sigtstp to the node.
-
#signal(signal) ⇒ Boolean
Sends provided signal to the node.
-
#start ⇒ Object
Starts a new fork and: - stores pid and parent reference - makes sure reader pipe is closed - sets up liveness listener - recreates producer and web producer.
-
#status ⇒ Integer
This returns following status code depending on the data: - -1 if node did not report anything new - 0 if all good, - positive number if there was a problem (indicates error code).
-
#stop ⇒ Object
Sends sigterm to the node.
-
#terminate ⇒ Object
Terminates node.
-
#unhealthy(reason_code = '1') ⇒ Object
Indicates, that this node has failed.
Constructor Details
#initialize(id, parent_pid) ⇒ Node
Returns a new instance of Node.
46 47 48 49 50 51 |
# File 'lib/karafka/swarm/node.rb', line 46 def initialize(id, parent_pid) @id = id @parent_pid = parent_pid @mutex = Mutex.new @alive = nil end |
Instance Attribute Details
#id ⇒ Integer (readonly)
Returns id of the node. Useful for client.group.id assignment.
25 26 27 |
# File 'lib/karafka/swarm/node.rb', line 25 def id @id end |
#pid ⇒ Integer (readonly)
Returns pid of the node.
28 29 30 |
# File 'lib/karafka/swarm/node.rb', line 28 def pid @pid end |
Instance Method Details
#alive? ⇒ Boolean
Parent API
Keep in mind that the fact that process is alive does not mean it is healthy
Returns true if node is alive or false if died.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/karafka/swarm/node.rb', line 153 def alive? # Don't try to waitpid on ourselves - just check if process exists return true if @pid == ::Process.pid @mutex.synchronize do # Return cached result if we've already determined the process is dead return false if @alive == false begin # Try to reap the process without blocking. If it returns the pid, # the process has exited (zombie). If it returns nil, still running. result = ::Process.waitpid(@pid, ::Process::WNOHANG) if result # Process has exited and we've reaped it @alive = false false else # Process is still running true end rescue Errno::ECHILD # Process doesn't exist or already reaped @alive = false false rescue Errno::ESRCH # Process doesn't exist @alive = false false end end end |
#cleanup ⇒ Boolean
Removes the dead process from the processes table
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/karafka/swarm/node.rb', line 223 def cleanup @mutex.synchronize do # If we've already marked it as dead (reaped in alive?), nothing to do return false if @alive == false begin # WNOHANG means don't block if process hasn't exited yet result = ::Process.waitpid(@pid, ::Process::WNOHANG) if result # Process exited and was reaped @alive = false true else # Process is still running false end rescue Errno::ECHILD # Process already reaped or doesn't exist, which is fine @alive = false false end end end |
#healthy ⇒ Object
Child API
Indicates that this node is doing well
120 121 122 |
# File 'lib/karafka/swarm/node.rb', line 120 def healthy write('0') end |
#orphaned? ⇒ Boolean
Child API
Returns true if node is orphaned or false otherwise. Used for orphans detection.
188 189 190 |
# File 'lib/karafka/swarm/node.rb', line 188 def orphaned? ::Process.ppid != @parent_pid end |
#quiet ⇒ Object
Parent API
Sends sigtstp to the node
200 201 202 |
# File 'lib/karafka/swarm/node.rb', line 200 def quiet signal('TSTP') end |
#signal(signal) ⇒ Boolean
Sends provided signal to the node
213 214 215 216 217 218 219 |
# File 'lib/karafka/swarm/node.rb', line 213 def signal(signal) ::Process.kill(signal, @pid) true rescue Errno::ESRCH # Process doesn't exist false end |
#start ⇒ Object
Parent API
Starts a new fork and: - stores pid and parent reference - makes sure reader pipe is closed - sets up liveness listener - recreates producer and web producer
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/karafka/swarm/node.rb', line 59 def start @reader, @writer = IO.pipe # Reset alive status when starting/restarting a node # nil means unknown status - will check with waitpid @mutex.synchronize { @alive = nil } # :nocov: @pid = ::Process.fork do # Close the old producer so it is not a subject to GC # While it was not opened in the parent, without explicit closing, there still could be # an attempt to close it when finalized, meaning it would be kept in memory. config.producer.close old_producer = config.producer old_producer_config = old_producer.config # Supervisor producer is closed, hence we need a new one here config.producer = WaterDrop::Producer.new do |p_config| p_config.kafka = Setup::AttributesMap.producer(kafka.dup) p_config.logger = config.logger old_producer_config.to_h.each do |key, value| next if SKIPPABLE_NEW_PRODUCER_ATTRIBUTES.include?(key) p_config.public_send("#{key}=", value) end # Namespaced attributes need to be migrated directly on their config node old_producer_config.oauth.to_h.each do |key, value| p_config.oauth.public_send("#{key}=", value) end end @pid = ::Process.pid @reader.close # Certain features need to be reconfigured / reinitialized after fork in Pro Pro::Loader.post_fork(config, old_producer) if Karafka.pro? # Indicate we are alive right after start healthy swarm.node = self monitor.subscribe(liveness_listener) monitor.instrument('swarm.node.after_fork', caller: self) Karafka::Process..add(:execution_mode, 'mode:swarm') Karafka::Process..add(:swarm_nodeid, "node:#{@id}") Server.execution_mode.swarm! Server.run @writer.close end # :nocov: @writer.close end |
#status ⇒ Integer
Parent API
If there were few issues reported, it will pick the one with highest number
Returns This returns following status code depending on the data: - -1 if node did not report anything new - 0 if all good, - positive number if there was a problem (indicates error code).
141 142 143 144 145 146 147 148 |
# File 'lib/karafka/swarm/node.rb', line 141 def status result = read return -1 if result.nil? return -1 if result == false result.split("\n").map(&:to_i).max end |
#stop ⇒ Object
Parent API
Sends sigterm to the node
194 195 196 |
# File 'lib/karafka/swarm/node.rb', line 194 def stop signal('TERM') end |
#terminate ⇒ Object
Parent API
Terminates node
206 207 208 |
# File 'lib/karafka/swarm/node.rb', line 206 def terminate signal('KILL') end |
#unhealthy(reason_code = '1') ⇒ Object
Child API
We convert this to string to normalize the API
Indicates, that this node has failed
130 131 132 |
# File 'lib/karafka/swarm/node.rb', line 130 def unhealthy(reason_code = '1') write(reason_code.to_s) end |