Class: Karafka::Core::Monitoring::Notifications

Inherits:
Object
  • Object
show all
Includes:
Helpers::Time
Defined in:
lib/karafka/core/monitoring/notifications.rb

Overview

A simple notifications layer for Karafka ecosystem that aims to provide API compatible with both ActiveSupport::Notifications and dry-monitor.

We do not use any of them by default as our use-case is fairly simple and we do not want to have too many external dependencies.

Constant Summary collapse

EventNotRegistered =

Raised when someone wants to publish event that was not registered

Class.new(StandardError)

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::Time

#float_now, #monotonic_now

Constructor Details

#initializeNotifications

Returns a new instance of Notifications.



24
25
26
27
28
29
# File 'lib/karafka/core/monitoring/notifications.rb', line 24

def initialize
  @listeners = {}
  @mutex = Mutex.new
  # This allows us to optimize the method calling lookups
  @events_methods_map = {}
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



14
15
16
# File 'lib/karafka/core/monitoring/notifications.rb', line 14

def name
  @name
end

Instance Method Details

#clear(event_id = nil) ⇒ Object

Clears all the subscribed listeners. If given an event, only clear listeners for the given event type.

Parameters:

  • event_id (String) (defaults to: nil)

    the key of the event to clear listeners for.



44
45
46
47
48
49
50
51
# File 'lib/karafka/core/monitoring/notifications.rb', line 44

def clear(event_id = nil)
  @mutex.synchronize do
    return @listeners.each_value(&:clear) unless event_id
    return @listeners[event_id].clear if @listeners.key?(event_id)

    raise(EventNotRegistered, "#{event_id} not registered!")
  end
end

#instrument(event_id, payload = EMPTY_HASH) {|Proc| ... } ⇒ Object

Allows for code instrumentation Runs the provided code and sends the instrumentation details to all registered listeners

Examples:

Instrument some code

instrument('sleeping') do
  sleep(1)
end

Parameters:

  • event_id (String)

    id of the event

  • payload (Hash) (defaults to: EMPTY_HASH)

    payload for the instrumentation

Yields:

  • (Proc)

    instrumented code

Returns:

  • (Object)

    whatever the provided block (if any) returns

Raises:



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/karafka/core/monitoring/notifications.rb', line 99

def instrument(event_id, payload = EMPTY_HASH)
  assigned_listeners = @listeners[event_id]

  # Allow for instrumentation of only events we registered. If listeners array does not
  # exist, it means the event was not registered.
  raise EventNotRegistered, event_id unless assigned_listeners

  if block_given?
    # No point in instrumentation when no one is listening
    # Since the outcome will be ignored, we may as well save on allocations
    # There are many events that happen often like (`message.acknowledged`) that most
    # users do not subscribe to. Such check prevents us from publishing events that would
    # not be used at all saving on time measurements and objects allocations
    return yield if assigned_listeners.empty?

    start = monotonic_now
    result = yield
    time = monotonic_now - start
  elsif assigned_listeners.empty?
    # Skip measuring or doing anything if no one listening
    return
  end

  event = Event.new(
    event_id,
    time ? payload.merge(time: time) : payload
  )

  assigned_listeners.each do |listener|
    if listener.is_a?(Proc)
      listener.call(event)
    else
      listener.send(@events_methods_map[event_id], event)
    end
  end

  result
end

#register_event(event_id) ⇒ Object

Registers a new event on which we can publish

Parameters:

  • event_id (String)

    event id



34
35
36
37
38
39
# File 'lib/karafka/core/monitoring/notifications.rb', line 34

def register_event(event_id)
  @mutex.synchronize do
    @listeners[event_id] = []
    @events_methods_map[event_id] = :"on_#{event_id.to_s.tr('.', '_')}"
  end
end

#subscribe(event_id_or_listener, &block) ⇒ Object

Allows for subscription to an event There are two ways you can subscribe: via block or via listener.

Examples:

Subscribe using listener

subscribe(MyListener.new)

Subscribe via block

subscribe do |event|
  puts event
end

Parameters:

  • event_id_or_listener (Object)

    event id when we want to subscribe to a particular event with a block or listener if we want to subscribe with general listener

  • block (Proc)

    block of code if we want to subscribe with it



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/karafka/core/monitoring/notifications.rb', line 67

def subscribe(event_id_or_listener, &block)
  @mutex.synchronize do
    if block
      event_id = event_id_or_listener

      raise EventNotRegistered, event_id unless @listeners.key?(event_id)

      @listeners[event_id] << block
    else
      listener = event_id_or_listener

      @listeners.each_key do |reg_event_id|
        next unless listener.respond_to?(@events_methods_map[reg_event_id])

        @listeners[reg_event_id] << listener
      end
    end
  end
end