Class: Karafka::Swarm::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/swarm/node.rb

Overview

Note:

Some of this APIs are for parent process only

Note:

Keep in mind this can be used in both forks and supervisor and has a slightly different role in each. In case of the supervisor it is used to get information about the child and make certain requests to it. In case of child, it is used to provide zombie-fencing and report liveness

Represents a single forked process node in a swarm Provides simple API to control forks and check their status

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, parent_pid) ⇒ Node

Returns a new instance of Node.

Parameters:

  • id (Integer)

    number of the fork. Used for uniqueness setup for group client ids and other stuff where we need to know a unique reference of the fork in regards to the rest of them.

  • parent_pid (Integer)

    parent pid for zombie fencing

[View source]

34
35
36
37
# File 'lib/karafka/swarm/node.rb', line 34

def initialize(id, parent_pid)
  @id = id
  @parent_pidfd = Pidfd.new(parent_pid)
end

Instance Attribute Details

#idInteger (readonly)

Returns id of the node. Useful for client.group.id assignment.

Returns:

  • (Integer)

    id of the node. Useful for client.group.id assignment


25
26
27
# File 'lib/karafka/swarm/node.rb', line 25

def id
  @id
end

#pidInteger (readonly)

Returns pid of the node.

Returns:

  • (Integer)

    pid of the node


28
29
30
# File 'lib/karafka/swarm/node.rb', line 28

def pid
  @pid
end

Instance Method Details

#alive?Boolean

Note:

Parent API

Note:

Keep in mind that the fact that process is alive does not mean it is healthy

Returns true if node is alive or false if died.

Returns:

  • (Boolean)

    true if node is alive or false if died

[View source]

118
119
120
# File 'lib/karafka/swarm/node.rb', line 118

def alive?
  @pidfd.alive?
end

#cleanupObject

Removes the dead process from the processes table

[View source]

153
154
155
# File 'lib/karafka/swarm/node.rb', line 153

def cleanup
  @pidfd.cleanup
end

#healthyObject

Note:

Child API

Indicates that this node is doing well

[View source]

85
86
87
# File 'lib/karafka/swarm/node.rb', line 85

def healthy
  write('0')
end

#orphaned?Boolean

Note:

Child API

Returns true if node is orphaned or false otherwise. Used for orphans detection.

Returns:

  • (Boolean)

    true if node is orphaned or false otherwise. Used for orphans detection.

[View source]

124
125
126
# File 'lib/karafka/swarm/node.rb', line 124

def orphaned?
  !@parent_pidfd.alive?
end

#quietObject

Note:

Parent API

Sends sigtstp to the node

[View source]

136
137
138
# File 'lib/karafka/swarm/node.rb', line 136

def quiet
  signal('TSTP')
end

#signal(signal) ⇒ Object

Sends provided signal to the node

Parameters:

  • signal (String)
[View source]

148
149
150
# File 'lib/karafka/swarm/node.rb', line 148

def signal(signal)
  @pidfd.signal(signal)
end

#startObject

Note:

Parent API

Starts a new fork and: - stores pid and parent reference - makes sure reader pipe is closed - sets up liveness listener - recreates producer and web producer

[View source]

45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/karafka/swarm/node.rb', line 45

def start
  @reader, @writer = IO.pipe

  # :nocov:
  @pid = ::Process.fork do
    # Close the old producer so it is not a subject to GC
    # While it was not opened in the parent, without explicit closing, there still could be
    # an attempt to close it when finalized, meaning it would be kept in memory.
    config.producer.close

    # Supervisor producer is closed, hence we need a new one here
    config.producer = ::WaterDrop::Producer.new do |p_config|
      p_config.kafka = Setup::AttributesMap.producer(kafka.dup)
      p_config.logger = config.logger
    end

    @pid = ::Process.pid
    @reader.close

    # Indicate we are alive right after start
    healthy

    swarm.node = self
    monitor.subscribe(liveness_listener)
    monitor.instrument('swarm.node.after_fork', caller: self)

    Karafka::Process.tags.add(:execution_mode, 'mode:swarm')
    Server.execution_mode = :swarm
    Server.run

    @writer.close
  end
  # :nocov:

  @writer.close
  @pidfd = Pidfd.new(@pid)
end

#statusInteger

Note:

Parent API

Note:

If there were few issues reported, it will pick the one with highest number

Returns This returns following status code depending on the data: - -1 if node did not report anything new - 0 if all good, - positive number if there was a problem (indicates error code).

Returns:

  • (Integer)

    This returns following status code depending on the data: - -1 if node did not report anything new - 0 if all good, - positive number if there was a problem (indicates error code)

[View source]

106
107
108
109
110
111
112
113
# File 'lib/karafka/swarm/node.rb', line 106

def status
  result = read

  return -1 if result.nil?
  return -1 if result == false

  result.split("\n").map(&:to_i).max
end

#stopObject

Note:

Parent API

Sends sigterm to the node

[View source]

130
131
132
# File 'lib/karafka/swarm/node.rb', line 130

def stop
  signal('TERM')
end

#terminateObject

Note:

Parent API

Terminates node

[View source]

142
143
144
# File 'lib/karafka/swarm/node.rb', line 142

def terminate
  signal('KILL')
end

#unhealthy(reason_code = '1') ⇒ Object

Note:

Child API

Note:

We convert this to string to normalize the API

Indicates, that this node has failed

Parameters:

  • reason_code (Integer, String) (defaults to: '1')

    numeric code we want to use to indicate that we are not healthy. Anything bigger than 0 will be considered not healthy. Useful it we want to have complex health-checking with reporting.

[View source]

95
96
97
# File 'lib/karafka/swarm/node.rb', line 95

def unhealthy(reason_code = '1')
  write(reason_code.to_s)
end