Class: Karafka::Web::Tracking::Producers::Reporter

Inherits:
Reporter
  • Object
show all
Defined in:
lib/karafka/web/tracking/producers/reporter.rb

Overview

Note:

Producer reported does not have to operate with the forced dispatch mainly because there is no expectation on immediate status updates for producers and their dispatch flow is always periodic based.

Reports the collected data about the producer and sends it, so we can use it in the UI

Constant Summary collapse

MUTEX =

This mutex is shared between tracker and samplers so there is no case where metrics would be collected same time tracker reports

Mutex.new

Instance Method Summary collapse

Methods inherited from Reporter

#active?

Constructor Details

#initializeReporter

Returns a new instance of Reporter.



17
18
19
20
21
22
# File 'lib/karafka/web/tracking/producers/reporter.rb', line 17

def initialize
  super
  # If there are any errors right after we started sampling, dispatch them immediately
  @tracked_at = monotonic_now - 10_000
  @error_contract = Tracking::Contracts::Error.new
end

Instance Method Details

#reportObject

Dispatches the current state from sampler to appropriate topics



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/karafka/web/tracking/producers/reporter.rb', line 25

def report
  MUTEX.synchronize do
    return unless report?

    @tracked_at = monotonic_now

    # Report errors that occurred (if any)
    messages = sampler.errors.map do |error|
      @error_contract.validate!(error)

      {
        topic: Karafka::Web.config.topics.errors,
        payload: error.to_json,
        # Always dispatch errors from the same process to the same partition
        key: error[:process][:id]
      }
    end

    return if messages.empty?

    produce(messages)

    # Clear the sampler so it tracks new state changes without previous once impacting
    # the data
    sampler.clear
  end
end