Class: Karafka::Web::Tracking::Consumers::Reporter
- Defined in:
- lib/karafka/web/tracking/consumers/reporter.rb
Overview
Reports the collected data about the process and sends it, so we can use it in the UI
Constant Summary collapse
- MUTEX =
This mutex is shared between tracker and samplers so there is no case where metrics would be collected same time tracker reports
Mutex.new
Instance Method Summary collapse
-
#active? ⇒ Boolean
We never report in initializing phase because things are not yet fully configured We never report in the initialized because server is not yet ready until Karafka is fully running and some of the things like listeners are not yet available.
-
#initialize ⇒ Reporter
constructor
A new instance of Reporter.
-
#report(forced: false) ⇒ Object
Dispatches the current state from sampler to appropriate topics.
-
#report! ⇒ Object
Reports bypassing frequency check.
Constructor Details
#initialize ⇒ Reporter
Returns a new instance of Reporter.
20 21 22 23 24 25 26 |
# File 'lib/karafka/web/tracking/consumers/reporter.rb', line 20 def initialize super # Move back so first report is dispatched fast to indicate, that the process is alive @tracked_at = monotonic_now - 10_000 @report_contract = Consumers::Contracts::Report.new @error_contract = Tracking::Contracts::Error.new end |
Instance Method Details
#active? ⇒ Boolean
We never report in initializing phase because things are not yet fully configured We never report in the initialized because server is not yet ready until Karafka is fully running and some of the things like listeners are not yet available
This method will also be false
in case we are not running in karafka server
or in embedding, because in those cases Karafka does not go beyond the initialized
phase
36 37 38 39 40 41 42 43 44 |
# File 'lib/karafka/web/tracking/consumers/reporter.rb', line 36 def active? # If we do not have a producer that we could use to report or it was closed, we cannot # and should not report return false unless super return false if ::Karafka::App.initializing? return false if ::Karafka::App.initialized? true end |
#report(forced: false) ⇒ Object
Dispatches the current state from sampler to appropriate topics
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/karafka/web/tracking/consumers/reporter.rb', line 50 def report(forced: false) # Do not even mutex if not needed return unless report?(forced) # We run this sampling before the mutex so sampling does not stop things in case # other threads would need this mutex. This can take up to 25ms and we do not want to # block during this time sampler.sample MUTEX.synchronize do return unless report?(forced) @tracked_at = monotonic_now report = sampler.to_report @report_contract.validate!(report) process_name = report[:process][:name] # Report consumers statuses = [ { topic: ::Karafka::Web.config.topics.consumers.reports, payload: Zlib::Deflate.deflate(report.to_json), key: process_name, partition: 0, headers: { 'zlib' => 'true' } } ] # Report errors that occurred (if any) += sampler.errors.map do |error| @error_contract.validate!(error) { topic: Karafka::Web.config.topics.errors, payload: Zlib::Deflate.deflate(error.to_json), # Always dispatch errors from the same process to the same partition key: process_name, headers: { 'zlib' => 'true' } } end produce() # Clear the sampler so it tracks new state changes without previous once impacting # the data sampler.clear end end |
#report! ⇒ Object
Reports bypassing frequency check. This can be used to report when state changes in the process drastically. For example when process is stopping, we want to indicate this as fast as possible in the UI, etc.
105 106 107 |
# File 'lib/karafka/web/tracking/consumers/reporter.rb', line 105 def report! report(forced: true) end |