Class: Karafka::Swarm::Supervisor

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/swarm/supervisor.rb

Overview

Note:

Technically speaking supervisor is never in the running state because we do not want to have any sockets or anything else on it that could break under forking. It has its own “supervising” state from which it can go to the final shutdown.

Supervisor that starts forks and uses monitor to monitor them. Also handles shutdown of all the processes including itself.

In case any node dies, it will be restarted.

Instance Method Summary collapse

Constructor Details

#initializeSupervisor

Returns a new instance of Supervisor.

[View source]

37
38
39
40
# File 'lib/karafka/swarm/supervisor.rb', line 37

def initialize
  @mutex = Mutex.new
  @queue = Processing::TimedQueue.new
end

Instance Method Details

#runObject

Creates needed number of forks, installs signals and starts supervision

[View source]

43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/karafka/swarm/supervisor.rb', line 43

def run
  # Validate the CLI provided options the same way as we do for the regular server
  cli_contract.validate!(activity_manager.to_h)

  # Close producer just in case. While it should not be used, we do not want even a
  # theoretical case since librdkafka is not thread-safe.
  # We close it prior to forking just to make sure, there is no issue with initialized
  # producer (should not be initialized but just in case)
  Karafka.producer.close

  Karafka::App.warmup

  manager.start

  process.on_sigint { stop }
  process.on_sigquit { stop }
  process.on_sigterm { stop }
  process.on_sigtstp { quiet }
  process.on_sigttin { signal('TTIN') }
  # Needed to be registered as we want to unlock on child changes
  process.on_sigchld {}
  process.on_any_active { unlock }
  process.supervise

  Karafka::App.supervise!

  loop do
    return if Karafka::App.terminated?

    lock
    control
  end

# If the cli contract validation failed reraise immediately and stop the process
rescue Karafka::Errors::InvalidConfigurationError => e
  raise e
# If anything went wrong during supervision, signal this and die
# Supervisor is meant to be thin and not cause any issues. If you encounter this case
# please report it as it should be considered critical
rescue StandardError => e
  monitor.instrument(
    'error.occurred',
    caller: self,
    error: e,
    manager: manager,
    type: 'swarm.supervisor.error'
  )

  manager.terminate
  manager.cleanup

  raise e
end