Class: Karafka::Web::Tracking::Consumers::Listeners::Processing
- Defined in:
- lib/karafka/web/tracking/consumers/listeners/processing.rb
Overview
Listener that is used to collect metrics related to work processing
Instance Method Summary collapse
-
#on_consumer_consume(event) ⇒ Object
Counts work execution and processing states in consumer instances.
-
#on_consumer_consumed(event) ⇒ Object
Collect info about consumption event that occurred and its metrics Removes the job from running jobs.
-
#on_error_occurred(event) ⇒ Object
Removes failed job from active jobs.
-
#on_worker_processed(event) ⇒ Object
Collect time metrics about worker work execution time.
Instance Method Details
#on_consumer_consume(event) ⇒ Object
Counts work execution and processing states in consumer instances
45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/karafka/web/tracking/consumers/listeners/processing.rb', line 45 def on_consumer_consume(event) consumer = event.payload[:caller] = consumer..size jid = job_id(consumer, 'consume') job_details = job_details(consumer, 'consume') track do |sampler| # We count batches and messages prior to the execution, so they are tracked even # if error occurs, etc. sampler.counters[:batches] += 1 sampler.counters[:messages] += sampler.jobs[jid] = job_details end end |
#on_consumer_consumed(event) ⇒ Object
Collect info about consumption event that occurred and its metrics Removes the job from running jobs
64 65 66 67 68 69 70 71 |
# File 'lib/karafka/web/tracking/consumers/listeners/processing.rb', line 64 def on_consumer_consumed(event) consumer = event.payload[:caller] jid = job_id(consumer, 'consume') track do |sampler| sampler.jobs.delete(jid) end end |
#on_error_occurred(event) ⇒ Object
Removes failed job from active jobs
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/karafka/web/tracking/consumers/listeners/processing.rb', line 76 def on_error_occurred(event) track do |sampler| type = case event[:type] when 'consumer.consume.error' 'consume' when 'consumer.revoked.error' 'revoked' when 'consumer.shutdown.error' 'shutdown' when 'consumer.tick.error' 'tick' # This is not a user facing execution flow, but internal system one # that is why it will not be reported as a separate job for the UI when 'consumer.idle.error' false else false end # job reference only exists for consumer work related operations. # Only for them we need to deregister the job reference. # This also refers only to consumer work that runs user operations. return unless type sampler.jobs.delete( job_id(event[:caller], type) ) end end |
#on_worker_processed(event) ⇒ Object
Collect time metrics about worker work execution time
13 14 15 16 17 |
# File 'lib/karafka/web/tracking/consumers/listeners/processing.rb', line 13 def on_worker_processed(event) track do |sampler| sampler.windows.m1[:processed_total_time] << event[:time] end end |