Class: Karafka::Web::Tracking::Producers::Reporter
- Defined in:
- lib/karafka/web/tracking/producers/reporter.rb
Overview
Note:
Producer reported does not have to operate with the forced
dispatch mainly because there is no expectation on immediate status updates for producers and their dispatch flow is always periodic based.
Reports the collected data about the producer and sends it, so we can use it in the UI
Constant Summary collapse
- MUTEX =
This mutex is shared between tracker and samplers so there is no case where metrics would be collected same time tracker reports
Mutex.new
Instance Method Summary collapse
-
#initialize ⇒ Reporter
constructor
A new instance of Reporter.
-
#report ⇒ Object
Dispatches the current state from sampler to appropriate topics.
Methods inherited from Reporter
Constructor Details
#initialize ⇒ Reporter
Returns a new instance of Reporter.
17 18 19 20 21 22 |
# File 'lib/karafka/web/tracking/producers/reporter.rb', line 17 def initialize super # If there are any errors right after we started sampling, dispatch them immediately @tracked_at = monotonic_now - 10_000 @error_contract = Tracking::Contracts::Error.new end |
Instance Method Details
#report ⇒ Object
Dispatches the current state from sampler to appropriate topics
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/karafka/web/tracking/producers/reporter.rb', line 25 def report MUTEX.synchronize do return unless report? @tracked_at = monotonic_now # Report errors that occurred (if any) = sampler.errors.map do |error| @error_contract.validate!(error) { topic: Karafka::Web.config.topics.errors, payload: error.to_json, # Always dispatch errors from the same process to the same partition key: error[:process][:id] } end return if .empty? produce() # Clear the sampler so it tracks new state changes without previous once impacting # the data sampler.clear end end |