Class: Karafka::Web::Pro::Commanding::Manager

Inherits:
Object
  • Object
show all
Includes:
Helpers::Async, Singleton
Defined in:
lib/karafka/web/pro/commanding/manager.rb

Overview

Manager responsible for receiving commands and taking appropriate actions It uses the assign API instead of subscribe and it does NOT publish or change anything Since its subscription is not user-related and does not run any work in the workers, it is not visible in the statistics.

There are few critical things here worth keeping in mind: - We do not use dynamic routing here and we do not inject active consumption - We use a direct assign API to get an “invisible” (from the end user) perspective connection to the commands topic for management. This is done that way so the end user does not see this connection within the UI as it should not be a manageable one anyhow. Also, on top of that, because we handle it under the hood, this is also not prone to saturation and other issues that can arise when working under stress. Thanks to that, probing can be handled almost immediately on command arrival. - Messages causing errors will be ignored and won’t block. - Any errors are reported back to the Karafka monitor pipeline.

Instance Method Summary collapse

Constructor Details

#initializeManager

Returns a new instance of Manager.



37
38
39
40
# File 'lib/karafka/web/pro/commanding/manager.rb', line 37

def initialize
  @listener = Listener.new
  @matcher = Matcher.new
end

Instance Method Details

#on_app_running(_event) ⇒ Object

When app starts to run, we start to listen for commands

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


45
46
47
# File 'lib/karafka/web/pro/commanding/manager.rb', line 45

def on_app_running(_event)
  async_call('karafka.web.pro.commanding.manager')
end

#on_app_stopped(_event) ⇒ Object

This ensures that in case of super fast shutdown, we wait on this in case it would be slower not to end up with a semi-closed iterator.

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


60
61
62
# File 'lib/karafka/web/pro/commanding/manager.rb', line 60

def on_app_stopped(_event)
  @thread&.join
end

#on_app_stopping(_event) ⇒ Object

When app stops, we stop the manager

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


52
53
54
# File 'lib/karafka/web/pro/commanding/manager.rb', line 52

def on_app_stopping(_event)
  @listener.stop
end