Class: Karafka::Web::Tracking::Producers::Reporter

Inherits:
Reporter
  • Object
show all
Defined in:
lib/karafka/web/tracking/producers/reporter.rb

Overview

Note:

Producer reported does not have to operate with the forced dispatch mainly because there is no expectation on immediate status updates for producers and their dispatch flow is always periodic based.

Reports the collected data about the producer and sends it, so we can use it in the UI

Constant Summary collapse

MUTEX =

This mutex is shared between tracker and samplers so there is no case where metrics would be collected same time tracker reports

Mutex.new

Instance Method Summary collapse

Methods inherited from Reporter

#active?

Constructor Details

#initializeReporter

Returns a new instance of Reporter.



24
25
26
27
28
29
# File 'lib/karafka/web/tracking/producers/reporter.rb', line 24

def initialize
  super
  # If there are any errors right after we started sampling, dispatch them immediately
  @tracked_at = monotonic_now - 10_000
  @error_contract = Tracking::Contracts::Error.new
end

Instance Method Details

#reportObject

Dispatches the current state from sampler to appropriate topics



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/karafka/web/tracking/producers/reporter.rb', line 32

def report
  MUTEX.synchronize do
    return unless report?

    @tracked_at = monotonic_now

    # Report errors that occurred (if any)
    messages = sampler.errors.map do |error|
      @error_contract.validate!(error)

      {
        topic: Karafka::Web.config.topics.errors,
        payload: error.to_json,
        # Always dispatch errors from the same process to the same partition
        key: error[:process][:name]
      }
    end

    return if messages.empty?

    produce(messages)

    # Clear the sampler so it tracks new state changes without previous once impacting
    # the data
    sampler.clear
  end
end