Class: Karafka::Core::Monitoring::Notifications

Inherits:
Object
  • Object
show all
Includes:
Helpers::Time
Defined in:
lib/karafka/core/monitoring/notifications.rb

Overview

A simple notifications layer for Karafka ecosystem that aims to provide API compatible with both ActiveSupport::Notifications and dry-monitor.

We do not use any of them by default as our use-case is fairly simple and we do not want to have too many external dependencies.

Constant Summary collapse

EventNotRegistered =

Raised when someone wants to publish event that was not registered

Class.new(StandardError)

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::Time

#float_now, #monotonic_now

Constructor Details

#initializeNotifications

Returns a new instance of Notifications.



24
25
26
27
28
29
# File 'lib/karafka/core/monitoring/notifications.rb', line 24

def initialize
  @listeners = Hash.new { |hash, key| hash[key] = [] }
  @mutex = Mutex.new
  # This allows us to optimize the method calling lookups
  @events_methods_map = {}
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



14
15
16
# File 'lib/karafka/core/monitoring/notifications.rb', line 14

def name
  @name
end

Instance Method Details

#clearObject

Clears all the subscribed listeners



42
43
44
45
46
# File 'lib/karafka/core/monitoring/notifications.rb', line 42

def clear
  @mutex.synchronize do
    @listeners.each_value(&:clear)
  end
end

#instrument(event_id, payload = EMPTY_HASH, &block) ⇒ Object

Allows for code instrumentation Runs the provided code and sends the instrumentation details to all registered listeners

Examples:

Instrument some code

instrument('sleeping') do
  sleep(1)
end

Parameters:

  • event_id (String)

    id of the event

  • payload (Hash) (defaults to: EMPTY_HASH)

    payload for the instrumentation

  • block (Proc)

    instrumented code

Returns:

  • (Object)

    whatever the provided block (if any) returns

Raises:



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/karafka/core/monitoring/notifications.rb', line 94

def instrument(event_id, payload = EMPTY_HASH, &block)
  # Allow for instrumentation of only events we registered
  raise EventNotRegistered, event_id unless @listeners.key?(event_id)

  result, time = measure_time_taken(&block) if block_given?

  event = Event.new(
    event_id,
    time ? payload.merge(time: time) : payload
  )

  @listeners[event_id].each do |listener|
    if listener.is_a?(Proc)
      listener.call(event)
    else
      listener.send(@events_methods_map[event_id], event)
    end
  end

  result
end

#register_event(event_id) ⇒ Object

Registers a new event on which we can publish

Parameters:

  • event_id (String)

    event id



34
35
36
37
38
39
# File 'lib/karafka/core/monitoring/notifications.rb', line 34

def register_event(event_id)
  @mutex.synchronize do
    @listeners[event_id]
    @events_methods_map[event_id] = :"on_#{event_id.to_s.tr('.', '_')}"
  end
end

#subscribe(event_id_or_listener, &block) ⇒ Object

Allows for subscription to an event There are two ways you can subscribe: via block or via listener.

Examples:

Subscribe using listener

subscribe(MyListener.new)

Subscribe via block

subscribe do |event|
  puts event
end

Parameters:

  • event_id_or_listener (Object)

    event id when we want to subscribe to a particular event with a block or listener if we want to subscribe with general listener

  • block (Proc)

    block of code if we want to subscribe with it



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/karafka/core/monitoring/notifications.rb', line 62

def subscribe(event_id_or_listener, &block)
  @mutex.synchronize do
    if block
      event_id = event_id_or_listener

      raise EventNotRegistered, event_id unless @listeners.key?(event_id)

      @listeners[event_id] << block
    else
      listener = event_id_or_listener

      @listeners.each_key do |reg_event_id|
        next unless listener.respond_to?(@events_methods_map[reg_event_id])

        @listeners[reg_event_id] << listener
      end
    end
  end
end