Class: Karafka::Swarm::Supervisor

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/swarm/supervisor.rb

Overview

Note:

Technically speaking supervisor is never in the running state because we do not want to have any sockets or anything else on it that could break under forking. It has its own “supervising” state from which it can go to the final shutdown.

Supervisor that starts forks and uses monitor to monitor them. Also handles shutdown of all the processes including itself.

In case any node dies, it will be restarted.

Instance Method Summary collapse

Constructor Details

#initializeSupervisor

Returns a new instance of Supervisor.



37
38
39
40
# File 'lib/karafka/swarm/supervisor.rb', line 37

def initialize
  @mutex = Mutex.new
  @queue = Processing::TimedQueue.new
end

Instance Method Details

#runObject

Creates needed number of forks, installs signals and starts supervision



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/karafka/swarm/supervisor.rb', line 43

def run
  # Validate the CLI provided options the same way as we do for the regular server
  cli_contract.validate!(activity_manager.to_h)

  # Close producer just in case. While it should not be used, we do not want even a
  # theoretical case since librdkafka is not thread-safe.
  # We close it prior to forking just to make sure, there is no issue with initialized
  # producer (should not be initialized but just in case)
  Karafka.producer.close

  Karafka::App.warmup

  manager.start

  process.on_sigint { stop }
  process.on_sigquit { stop }
  process.on_sigterm { stop }
  process.on_sigtstp { quiet }
  process.on_sigttin { signal('TTIN') }
  # Needed to be registered as we want to unlock on child changes
  process.on_sigchld {}
  process.on_any_active { unlock }
  process.supervise

  Karafka::App.supervise!

  loop do
    return if Karafka::App.terminated?

    lock
    control
  end

# If the cli contract validation failed reraise immediately and stop the process
rescue Karafka::Errors::InvalidConfigurationError => e
  raise e
# If anything went wrong during supervision, signal this and die
# Supervisor is meant to be thin and not cause any issues. If you encounter this case
# please report it as it should be considered critical
rescue StandardError => e
  monitor.instrument(
    'error.occurred',
    caller: self,
    error: e,
    manager: manager,
    type: 'swarm.supervisor.error'
  )

  manager.terminate
  manager.cleanup

  raise e
end