Class: Karafka::Web::Tracking::Consumers::Reporter
- Defined in:
- lib/karafka/web/tracking/consumers/reporter.rb
Overview
Reports the collected data about the process and sends it, so we can use it in the UI
Constant Summary collapse
- MUTEX =
This mutex is shared between tracker and samplers so there is no case where metrics would be collected same time tracker reports
Mutex.new
Instance Method Summary collapse
-
#active? ⇒ Boolean
We never report in initializing phase because things are not yet fully configured We never report in the initialized because server is not yet ready until Karafka is fully running and some of the things like listeners are not yet available.
-
#initialize ⇒ Reporter
constructor
A new instance of Reporter.
-
#report(forced: false) ⇒ Object
Dispatches the current state from sampler to appropriate topics.
-
#report! ⇒ Object
Reports bypassing frequency check.
Constructor Details
#initialize ⇒ Reporter
Returns a new instance of Reporter.
13 14 15 16 17 18 19 |
# File 'lib/karafka/web/tracking/consumers/reporter.rb', line 13 def initialize super # Move back so first report is dispatched fast to indicate, that the process is alive @tracked_at = monotonic_now - 10_000 @report_contract = Consumers::Contracts::Report.new @error_contract = Tracking::Contracts::Error.new end |
Instance Method Details
#active? ⇒ Boolean
We never report in initializing phase because things are not yet fully configured We never report in the initialized because server is not yet ready until Karafka is fully running and some of the things like listeners are not yet available
This method will also be false
in case we are not running in karafka server
or in embedding, because in those cases Karafka does not go beyond the initialized
phase
29 30 31 32 33 34 35 36 37 |
# File 'lib/karafka/web/tracking/consumers/reporter.rb', line 29 def active? # If we do not have a producer that we could use to report or it was closed, we cannot # and should not report return false unless super return false if ::Karafka::App.initializing? return false if ::Karafka::App.initialized? true end |
#report(forced: false) ⇒ Object
Dispatches the current state from sampler to appropriate topics
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/karafka/web/tracking/consumers/reporter.rb', line 43 def report(forced: false) # Do not even mutex if not needed return unless report?(forced) # We run this sampling before the mutex so sampling does not stop things in case # other threads would need this mutex. This can take up to 25ms and we do not want to # block during this time sampler.sample MUTEX.synchronize do return unless report?(forced) @tracked_at = monotonic_now report = sampler.to_report @report_contract.validate!(report) process_id = report[:process][:id] # Report consumers statuses = [ { topic: ::Karafka::Web.config.topics.consumers.reports, payload: Zlib::Deflate.deflate(report.to_json), key: process_id, partition: 0, headers: { 'zlib' => 'true' } } ] # Report errors that occurred (if any) += sampler.errors.map do |error| @error_contract.validate!(error) { topic: Karafka::Web.config.topics.errors, payload: Zlib::Deflate.deflate(error.to_json), # Always dispatch errors from the same process to the same partition key: process_id, headers: { 'zlib' => 'true' } } end produce() # Clear the sampler so it tracks new state changes without previous once impacting # the data sampler.clear end end |
#report! ⇒ Object
Reports bypassing frequency check. This can be used to report when state changes in the process drastically. For example when process is stopping, we want to indicate this as fast as possible in the UI, etc.
98 99 100 |
# File 'lib/karafka/web/tracking/consumers/reporter.rb', line 98 def report! report(forced: true) end |