Class: Karafka::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/server.rb

Overview

Karafka consuming server class

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.execution_modeObject

Mode in which the Karafka server is executed. It can be:

  • :standalone - regular karafka consumer process

  • :embedded - embedded in a different process and not supervised

  • :supervisor - swarm supervisor process

  • :swarm - one of swarm processes

Sometimes it is important to know in what mode we operate, especially from UI perspective as not everything is possible when operating in non-standalone mode, etc.



31
32
33
# File 'lib/karafka/server.rb', line 31

def execution_mode
  @execution_mode
end

.jobs_queueObject

Jobs queue



20
21
22
# File 'lib/karafka/server.rb', line 20

def jobs_queue
  @jobs_queue
end

.listenersObject

Set of consuming threads. Each consumer thread contains a single consumer



14
15
16
# File 'lib/karafka/server.rb', line 14

def listeners
  @listeners
end

.workersObject

Set of workers



17
18
19
# File 'lib/karafka/server.rb', line 17

def workers
  @workers
end

Class Method Details

.quietObject

Quiets the Karafka server.

Karafka will stop processing but won’t quit the consumer group, so no rebalance will be triggered until final shutdown.



167
168
169
170
171
# File 'lib/karafka/server.rb', line 167

def quiet
  # We don't have to safe-guard it with check states as the state transitions work only
  # in one direction
  Karafka::App.quiet!
end

.runObject

Method which runs app



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/karafka/server.rb', line 34

def run
  self.listeners = []
  self.workers = []

  # We need to validate this prior to running because it may be executed also from the
  # embedded
  # We cannot validate this during the start because config needs to be populated and routes
  # need to be defined.
  config.internal.cli.contract.validate!(
    config.internal.routing.activity_manager.to_h
  )

  # We clear as we do not want parent handlers in case of working from fork
  process.clear
  process.on_sigint { stop }
  process.on_sigquit { stop }
  process.on_sigterm { stop }
  process.on_sigtstp { quiet }
  # Needed for instrumentation
  process.on_sigttin {}
  process.supervise

  # This will only run when not in a swarm mode. In swarm mode the server runs post-fork, so
  # warmup will do nothing
  Karafka::App.warmup

  # Start is blocking until stop is called and when we stop, it will wait until
  # all of the things are ready to stop
  start

  # We always need to wait for Karafka to stop here since we should wait for the stop running
  # in a separate thread (or trap context) to indicate everything is closed
  # Since `#start` is blocking, we will get here only after the runner is done. This will
  # not add any performance degradation because of that.
  sleep(0.1) until Karafka::App.terminated?
# Try its best to shutdown underlying components before re-raising
# rubocop:disable Lint/RescueException
rescue Exception => e
  # rubocop:enable Lint/RescueException
  stop

  raise e
end

.startObject

Note:

We don’t need to sleep because Karafka::Runner is locking and waiting to finish loop

Starts Karafka with a supervision (and it won’t happen until we explicitly want to stop)



81
82
83
# File 'lib/karafka/server.rb', line 81

def start
  Karafka::Runner.new.call
end

.stopObject

Note:

This method is not async. It should not be executed from the workers as it will lock them forever. If you need to run Karafka shutdown from within workers threads, please start a separate thread to do so.

Stops Karafka with a supervision (as long as there is a shutdown timeout) If consumers or workers won’t stop in a given time frame, it will force them to exit



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/karafka/server.rb', line 91

def stop
  # Initialize the stopping process only if Karafka was running
  return if Karafka::App.stopping?
  return if Karafka::App.stopped?
  return if Karafka::App.terminated?

  Karafka::App.stop!

  timeout = config.shutdown_timeout

  # We check from time to time (for the timeout period) if all the threads finished
  # their work and if so, we can just return and normal shutdown process will take place
  # We divide it by 1000 because we use time in ms.
  ((timeout / 1_000) * (1 / config.internal.supervision_sleep)).to_i.times do
    all_listeners_stopped = listeners.all?(&:stopped?)
    all_workers_stopped = workers.none?(&:alive?)

    return if all_listeners_stopped && all_workers_stopped

    sleep(config.internal.supervision_sleep)
  end

  raise Errors::ForcefulShutdownError
rescue Errors::ForcefulShutdownError => e
  Karafka.monitor.instrument(
    'error.occurred',
    caller: self,
    error: e,
    type: 'app.stopping.error'
  )

  # We're done waiting, lets kill them!
  workers.each(&:terminate)
  listeners.active.each(&:terminate)

  # We always need to shutdown clients to make sure we do not force the GC to close consumer.
  # This can cause memory leaks and crashes.
  # We run it in a separate thread in case this would hang and we ignore it after the time
  # we assigned to it and force shutdown as we prefer to stop the process rather than wait
  # indefinitely even with risk of VM crash as this is a last resort.
  Thread.new do
    listeners.each(&:shutdown)
  rescue StandardError => e
    # If anything wrong happened during shutdown, we also want to record it
    Karafka.monitor.instrument(
      'error.occurred',
      caller: self,
      error: e,
      type: 'app.forceful_stopping.error'
    )
  end.join(FORCEFUL_SHUTDOWN_WAIT)

  # We also do not forcefully terminate everything when running in the embedded mode,
  # otherwise we would overwrite the shutdown process of the process that started Karafka
  return unless process.supervised?

  # exit! is not within the instrumentation as it would not trigger due to exit
  Kernel.exit!(config.internal.forceful_exit_code)
ensure
  # We need to check if it wasn't an early exit to make sure that only on stop invocation
  # can change the status after everything is closed
  if timeout
    Karafka::App.stopped!

    # We close producer as the last thing as it can be used in the notification pipeline
    # to dispatch state changes, etc
    Karafka::App.producer.close

    Karafka::App.terminate!
  end
end