Class: Karafka::Swarm::Supervisor
- Inherits:
-
Object
- Object
- Karafka::Swarm::Supervisor
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/swarm/supervisor.rb
Overview
Note:
Technically speaking supervisor is never in the running state because we do not want to have any sockets or anything else on it that could break under forking. It has its own “supervising” state from which it can go to the final shutdown.
Supervisor that starts forks and uses monitor to monitor them. Also handles shutdown of all the processes including itself.
In case any node dies, it will be restarted.
Instance Method Summary collapse
-
#initialize ⇒ Supervisor
constructor
A new instance of Supervisor.
-
#run ⇒ Object
Creates needed number of forks, installs signals and starts supervision.
Constructor Details
#initialize ⇒ Supervisor
Returns a new instance of Supervisor.
37 38 39 40 |
# File 'lib/karafka/swarm/supervisor.rb', line 37 def initialize @mutex = Mutex.new @queue = Processing::TimedQueue.new end |
Instance Method Details
#run ⇒ Object
Creates needed number of forks, installs signals and starts supervision
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/karafka/swarm/supervisor.rb', line 43 def run # Validate the CLI provided options the same way as we do for the regular server cli_contract.validate!(activity_manager.to_h) # Close producer just in case. While it should not be used, we do not want even a # theoretical case since librdkafka is not thread-safe. # We close it prior to forking just to make sure, there is no issue with initialized # producer (should not be initialized but just in case) Karafka.producer.close Karafka::App.warmup manager.start process.on_sigint { stop } process.on_sigquit { stop } process.on_sigterm { stop } process.on_sigtstp { quiet } process.on_sigttin { signal('TTIN') } # Needed to be registered as we want to unlock on child changes process.on_sigchld {} process.on_any_active { unlock } process.supervise Karafka::App.supervise! loop do return if Karafka::App.terminated? lock control end # If the cli contract validation failed reraise immediately and stop the process rescue Karafka::Errors::InvalidConfigurationError => e raise e # If anything went wrong during supervision, signal this and die # Supervisor is meant to be thin and not cause any issues. If you encounter this case # please report it as it should be considered critical rescue StandardError => e monitor.instrument( 'error.occurred', caller: self, error: e, manager: manager, type: 'swarm.supervisor.error' ) manager.terminate manager.cleanup raise e end |