Class: Karafka::Web::Tracking::Consumers::Reporter

Inherits:
Reporter
  • Object
show all
Defined in:
lib/karafka/web/tracking/consumers/reporter.rb

Overview

Reports the collected data about the process and sends it, so we can use it in the UI

Constant Summary collapse

MUTEX =

This mutex is shared between tracker and samplers so there is no case where metrics would be collected same time tracker reports

Mutex.new

Instance Method Summary collapse

Constructor Details

#initializeReporter

Returns a new instance of Reporter.



20
21
22
23
24
25
26
# File 'lib/karafka/web/tracking/consumers/reporter.rb', line 20

def initialize
  super
  # Move back so first report is dispatched fast to indicate, that the process is alive
  @tracked_at = monotonic_now - 10_000
  @report_contract = Consumers::Contracts::Report.new
  @error_contract = Tracking::Contracts::Error.new
end

Instance Method Details

#active?Boolean

We never report in initializing phase because things are not yet fully configured We never report in the initialized because server is not yet ready until Karafka is fully running and some of the things like listeners are not yet available

This method will also be false in case we are not running in karafka server or in embedding, because in those cases Karafka does not go beyond the initialized phase

Returns:

  • (Boolean)

    are we able to report consumer state



36
37
38
39
40
41
42
43
44
# File 'lib/karafka/web/tracking/consumers/reporter.rb', line 36

def active?
  # If we do not have a producer that we could use to report or it was closed, we cannot
  # and should not report
  return false unless super
  return false if ::Karafka::App.initializing?
  return false if ::Karafka::App.initialized?

  true
end

#report(forced: false) ⇒ Object

Dispatches the current state from sampler to appropriate topics

Parameters:

  • forced (Boolean) (defaults to: false)

    should we report bypassing the time frequency or should we report only in case we would not send the report for long enough time.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/karafka/web/tracking/consumers/reporter.rb', line 50

def report(forced: false)
  # Do not even mutex if not needed
  return unless report?(forced)

  # We run this sampling before the mutex so sampling does not stop things in case
  # other threads would need this mutex. This can take up to 25ms and we do not want to
  # block during this time
  sampler.sample

  MUTEX.synchronize do
    return unless report?(forced)

    @tracked_at = monotonic_now

    report = sampler.to_report

    @report_contract.validate!(report)

    process_name = report[:process][:name]

    # Report consumers statuses
    messages = [
      {
        topic: ::Karafka::Web.config.topics.consumers.reports,
        payload: Zlib::Deflate.deflate(report.to_json),
        key: process_name,
        partition: 0,
        headers: { 'zlib' => 'true' }
      }
    ]

    # Report errors that occurred (if any)
    messages += sampler.errors.map do |error|
      @error_contract.validate!(error)

      {
        topic: Karafka::Web.config.topics.errors,
        payload: Zlib::Deflate.deflate(error.to_json),
        # Always dispatch errors from the same process to the same partition
        key: process_name,
        headers: { 'zlib' => 'true' }
      }
    end

    produce(messages)

    # Clear the sampler so it tracks new state changes without previous once impacting
    # the data
    sampler.clear
  end
end

#report!Object

Reports bypassing frequency check. This can be used to report when state changes in the process drastically. For example when process is stopping, we want to indicate this as fast as possible in the UI, etc.



105
106
107
# File 'lib/karafka/web/tracking/consumers/reporter.rb', line 105

def report!
  report(forced: true)
end