Class: Karafka::Core::Monitoring::Notifications
- Inherits:
-
Object
- Object
- Karafka::Core::Monitoring::Notifications
- Includes:
- Helpers::Time
- Defined in:
- lib/karafka/core/monitoring/notifications.rb
Overview
A simple notifications layer for Karafka ecosystem that aims to provide API compatible with both ActiveSupport::Notifications
and dry-monitor
.
We do not use any of them by default as our use-case is fairly simple and we do not want to have too many external dependencies.
Constant Summary collapse
- EventNotRegistered =
Raised when someone wants to publish event that was not registered
Class.new(StandardError)
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#clear(event_id = nil) ⇒ Object
Clears all the subscribed listeners.
-
#initialize ⇒ Notifications
constructor
A new instance of Notifications.
-
#instrument(event_id, payload = EMPTY_HASH) {|Proc| ... } ⇒ Object
Allows for code instrumentation Runs the provided code and sends the instrumentation details to all registered listeners.
-
#register_event(event_id) ⇒ Object
Registers a new event on which we can publish.
-
#subscribe(event_id_or_listener, &block) ⇒ Object
Allows for subscription to an event There are two ways you can subscribe: via block or via listener.
Methods included from Helpers::Time
Constructor Details
#initialize ⇒ Notifications
Returns a new instance of Notifications.
24 25 26 27 28 29 |
# File 'lib/karafka/core/monitoring/notifications.rb', line 24 def initialize @listeners = {} @mutex = Mutex.new # This allows us to optimize the method calling lookups @events_methods_map = {} end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
14 15 16 |
# File 'lib/karafka/core/monitoring/notifications.rb', line 14 def name @name end |
Instance Method Details
#clear(event_id = nil) ⇒ Object
Clears all the subscribed listeners. If given an event, only clear listeners for the given event type.
44 45 46 47 48 49 50 51 |
# File 'lib/karafka/core/monitoring/notifications.rb', line 44 def clear(event_id = nil) @mutex.synchronize do return @listeners.each_value(&:clear) unless event_id return @listeners[event_id].clear if @listeners.key?(event_id) raise(EventNotRegistered, "#{event_id} not registered!") end end |
#instrument(event_id, payload = EMPTY_HASH) {|Proc| ... } ⇒ Object
Allows for code instrumentation Runs the provided code and sends the instrumentation details to all registered listeners
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/karafka/core/monitoring/notifications.rb', line 99 def instrument(event_id, payload = EMPTY_HASH) assigned_listeners = @listeners[event_id] # Allow for instrumentation of only events we registered. If listeners array does not # exist, it means the event was not registered. raise EventNotRegistered, event_id unless assigned_listeners if block_given? # No point in instrumentation when no one is listening # Since the outcome will be ignored, we may as well save on allocations # There are many events that happen often like (`message.acknowledged`) that most # users do not subscribe to. Such check prevents us from publishing events that would # not be used at all saving on time measurements and objects allocations return yield if assigned_listeners.empty? start = monotonic_now result = yield time = monotonic_now - start elsif assigned_listeners.empty? # Skip measuring or doing anything if no one listening return end event = Event.new( event_id, time ? payload.merge(time: time) : payload ) assigned_listeners.each do |listener| if listener.is_a?(Proc) listener.call(event) else listener.send(@events_methods_map[event_id], event) end end result end |
#register_event(event_id) ⇒ Object
Registers a new event on which we can publish
34 35 36 37 38 39 |
# File 'lib/karafka/core/monitoring/notifications.rb', line 34 def register_event(event_id) @mutex.synchronize do @listeners[event_id] = [] @events_methods_map[event_id] = :"on_#{event_id.to_s.tr('.', '_')}" end end |
#subscribe(event_id_or_listener, &block) ⇒ Object
Allows for subscription to an event There are two ways you can subscribe: via block or via listener.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/karafka/core/monitoring/notifications.rb', line 67 def subscribe(event_id_or_listener, &block) @mutex.synchronize do if block event_id = event_id_or_listener raise EventNotRegistered, event_id unless @listeners.key?(event_id) @listeners[event_id] << block else listener = event_id_or_listener @listeners.each_key do |reg_event_id| next unless listener.respond_to?(@events_methods_map[reg_event_id]) @listeners[reg_event_id] << listener end end end end |