Class: Karafka::Web::Tracking::Consumers::Listeners::Processing

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/web/tracking/consumers/listeners/processing.rb

Overview

Listener that is used to collect metrics related to work processing

Instance Method Summary collapse

Instance Method Details

#on_consumer_consume(event) ⇒ Object

Counts work execution and processing states in consumer instances

Parameters:

  • event (Karafka::Core::Monitoring::Event)


45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/karafka/web/tracking/consumers/listeners/processing.rb', line 45

def on_consumer_consume(event)
  consumer = event.payload[:caller]
  messages_count = consumer.messages.size
  jid = job_id(consumer, 'consume')
  job_details = job_details(consumer, 'consume')

  track do |sampler|
    # We count batches and messages prior to the execution, so they are tracked even
    # if error occurs, etc.
    sampler.counters[:batches] += 1
    sampler.counters[:messages] += messages_count
    sampler.jobs[jid] = job_details
  end
end

#on_consumer_consumed(event) ⇒ Object

Collect info about consumption event that occurred and its metrics Removes the job from running jobs

Parameters:

  • event (Karafka::Core::Monitoring::Event)


64
65
66
67
68
69
70
71
# File 'lib/karafka/web/tracking/consumers/listeners/processing.rb', line 64

def on_consumer_consumed(event)
  consumer = event.payload[:caller]
  jid = job_id(consumer, 'consume')

  track do |sampler|
    sampler.jobs.delete(jid)
  end
end

#on_error_occurred(event) ⇒ Object

Removes failed job from active jobs

Parameters:

  • event (Karafka::Core::Monitoring::Event)


76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/karafka/web/tracking/consumers/listeners/processing.rb', line 76

def on_error_occurred(event)
  track do |sampler|
    type = case event[:type]
           when 'consumer.consume.error'
             'consume'
           when 'consumer.revoked.error'
             'revoked'
           when 'consumer.shutdown.error'
             'shutdown'
           when 'consumer.tick.error'
             'tick'
           # This is not a user facing execution flow, but internal system one
           # that is why it will not be reported as a separate job for the UI
           when 'consumer.idle.error'
             false
           else
             false
           end

    # job reference only exists for consumer work related operations.
    # Only for them we need to deregister the job reference.
    # This also refers only to consumer work that runs user operations.
    return unless type

    sampler.jobs.delete(
      job_id(event[:caller], type)
    )
  end
end

#on_worker_processed(event) ⇒ Object

Collect time metrics about worker work execution time

Parameters:

  • event (Karafka::Core::Monitoring::Event)


13
14
15
16
17
# File 'lib/karafka/web/tracking/consumers/listeners/processing.rb', line 13

def on_worker_processed(event)
  track do |sampler|
    sampler.windows.m1[:processed_total_time] << event[:time]
  end
end