Class: Karafka::Swarm::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/swarm/node.rb

Overview

Note:

Some of this APIs are for parent process only

Note:

Keep in mind this can be used in both forks and supervisor and has a slightly different role in each. In case of the supervisor it is used to get information about the child and make certain requests to it. In case of child, it is used to provide zombie-fencing and report liveness

Represents a single forked process node in a swarm Provides simple API to control forks and check their status

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, parent_pid) ⇒ Node

Returns a new instance of Node.

Parameters:

  • id (Integer)

    number of the fork. Used for uniqueness setup for group client ids and other stuff where we need to know a unique reference of the fork in regards to the rest of them.

  • parent_pid (Integer)

    parent pid for zombie fencing



46
47
48
49
50
51
# File 'lib/karafka/swarm/node.rb', line 46

def initialize(id, parent_pid)
  @id = id
  @parent_pid = parent_pid
  @mutex = Mutex.new
  @alive = nil
end

Instance Attribute Details

#idInteger (readonly)

Returns id of the node. Useful for client.group.id assignment.

Returns:

  • (Integer)

    id of the node. Useful for client.group.id assignment



25
26
27
# File 'lib/karafka/swarm/node.rb', line 25

def id
  @id
end

#pidInteger (readonly)

Returns pid of the node.

Returns:

  • (Integer)

    pid of the node



28
29
30
# File 'lib/karafka/swarm/node.rb', line 28

def pid
  @pid
end

Instance Method Details

#alive?Boolean

Note:

Parent API

Note:

Keep in mind that the fact that process is alive does not mean it is healthy

Returns true if node is alive or false if died.

Returns:

  • (Boolean)

    true if node is alive or false if died



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/karafka/swarm/node.rb', line 153

def alive?
  # Don't try to waitpid on ourselves - just check if process exists
  return true if @pid == ::Process.pid

  @mutex.synchronize do
    # Return cached result if we've already determined the process is dead
    return false if @alive == false

    begin
      # Try to reap the process without blocking. If it returns the pid,
      # the process has exited (zombie). If it returns nil, still running.
      result = ::Process.waitpid(@pid, ::Process::WNOHANG)

      if result
        # Process has exited and we've reaped it
        @alive = false
        false
      else
        # Process is still running
        true
      end
    rescue Errno::ECHILD
      # Process doesn't exist or already reaped
      @alive = false
      false
    rescue Errno::ESRCH
      # Process doesn't exist
      @alive = false
      false
    end
  end
end

#cleanupBoolean

Removes the dead process from the processes table

Returns:

  • (Boolean)

    true if process was reaped, false if still running or already reaped



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/karafka/swarm/node.rb', line 223

def cleanup
  @mutex.synchronize do
    # If we've already marked it as dead (reaped in alive?), nothing to do
    return false if @alive == false

    begin
      # WNOHANG means don't block if process hasn't exited yet
      result = ::Process.waitpid(@pid, ::Process::WNOHANG)

      if result
        # Process exited and was reaped
        @alive = false
        true
      else
        # Process is still running
        false
      end
    rescue Errno::ECHILD
      # Process already reaped or doesn't exist, which is fine
      @alive = false
      false
    end
  end
end

#healthyObject

Note:

Child API

Indicates that this node is doing well



120
121
122
# File 'lib/karafka/swarm/node.rb', line 120

def healthy
  write('0')
end

#orphaned?Boolean

Note:

Child API

Returns true if node is orphaned or false otherwise. Used for orphans detection.

Returns:

  • (Boolean)

    true if node is orphaned or false otherwise. Used for orphans detection.



188
189
190
# File 'lib/karafka/swarm/node.rb', line 188

def orphaned?
  ::Process.ppid != @parent_pid
end

#quietObject

Note:

Parent API

Sends sigtstp to the node



200
201
202
# File 'lib/karafka/swarm/node.rb', line 200

def quiet
  signal('TSTP')
end

#signal(signal) ⇒ Boolean

Sends provided signal to the node

Parameters:

  • signal (String)

Returns:

  • (Boolean)

    true if signal was sent, false if process doesn’t exist



213
214
215
216
217
218
219
# File 'lib/karafka/swarm/node.rb', line 213

def signal(signal)
  ::Process.kill(signal, @pid)
  true
rescue Errno::ESRCH
  # Process doesn't exist
  false
end

#startObject

Note:

Parent API

Starts a new fork and: - stores pid and parent reference - makes sure reader pipe is closed - sets up liveness listener - recreates producer and web producer



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/karafka/swarm/node.rb', line 59

def start
  @reader, @writer = IO.pipe
  # Reset alive status when starting/restarting a node
  # nil means unknown status - will check with waitpid
  @mutex.synchronize { @alive = nil }

  # :nocov:
  @pid = ::Process.fork do
    # Close the old producer so it is not a subject to GC
    # While it was not opened in the parent, without explicit closing, there still could be
    # an attempt to close it when finalized, meaning it would be kept in memory.
    config.producer.close

    old_producer = config.producer
    old_producer_config = old_producer.config

    # Supervisor producer is closed, hence we need a new one here
    config.producer = WaterDrop::Producer.new do |p_config|
      p_config.kafka = Setup::AttributesMap.producer(kafka.dup)
      p_config.logger = config.logger

      old_producer_config.to_h.each do |key, value|
        next if SKIPPABLE_NEW_PRODUCER_ATTRIBUTES.include?(key)

        p_config.public_send("#{key}=", value)
      end

      # Namespaced attributes need to be migrated directly on their config node
      old_producer_config.oauth.to_h.each do |key, value|
        p_config.oauth.public_send("#{key}=", value)
      end
    end

    @pid = ::Process.pid
    @reader.close

    # Certain features need to be reconfigured / reinitialized after fork in Pro
    Pro::Loader.post_fork(config, old_producer) if Karafka.pro?

    # Indicate we are alive right after start
    healthy

    swarm.node = self
    monitor.subscribe(liveness_listener)
    monitor.instrument('swarm.node.after_fork', caller: self)

    Karafka::Process.tags.add(:execution_mode, 'mode:swarm')
    Karafka::Process.tags.add(:swarm_nodeid, "node:#{@id}")

    Server.execution_mode.swarm!
    Server.run

    @writer.close
  end
  # :nocov:

  @writer.close
end

#statusInteger

Note:

Parent API

Note:

If there were few issues reported, it will pick the one with highest number

Returns This returns following status code depending on the data: - -1 if node did not report anything new - 0 if all good, - positive number if there was a problem (indicates error code).

Returns:

  • (Integer)

    This returns following status code depending on the data: - -1 if node did not report anything new - 0 if all good, - positive number if there was a problem (indicates error code)



141
142
143
144
145
146
147
148
# File 'lib/karafka/swarm/node.rb', line 141

def status
  result = read

  return -1 if result.nil?
  return -1 if result == false

  result.split("\n").map(&:to_i).max
end

#stopObject

Note:

Parent API

Sends sigterm to the node



194
195
196
# File 'lib/karafka/swarm/node.rb', line 194

def stop
  signal('TERM')
end

#terminateObject

Note:

Parent API

Terminates node



206
207
208
# File 'lib/karafka/swarm/node.rb', line 206

def terminate
  signal('KILL')
end

#unhealthy(reason_code = '1') ⇒ Object

Note:

Child API

Note:

We convert this to string to normalize the API

Indicates, that this node has failed

Parameters:

  • reason_code (Integer, String) (defaults to: '1')

    numeric code we want to use to indicate that we are not healthy. Anything bigger than 0 will be considered not healthy. Useful it we want to have complex health-checking with reporting.



130
131
132
# File 'lib/karafka/swarm/node.rb', line 130

def unhealthy(reason_code = '1')
  write(reason_code.to_s)
end