Class: Karafka::Web::Tracking::Consumers::Sampler

Inherits:
Sampler
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/web/tracking/consumers/sampler.rb

Overview

Samples for fetching and storing metrics samples about the consumer process

Constant Summary collapse

SCHEMA_VERSION =

Current schema version This is used for detecting incompatible changes and not using outdated data during upgrades

'1.2.9'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Sampler

#karafka_core_version, #karafka_version, #karafka_web_version, #librdkafka_version, #process_name, #rdkafka_version, #ruby_version, #waterdrop_version

Constructor Details

#initializeSampler

Returns a new instance of Sampler.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 36

def initialize
  super

  @windows = Helpers::Ttls::Windows.new
  @counters = COUNTERS_BASE.dup
  @consumer_groups = Hash.new do |h, cg_id|
    h[cg_id] = {
      id: cg_id,
      subscription_groups: {}
    }
  end
  @subscription_groups = {}
  @errors = []
  @pauses = {}
  @jobs = {}
  @shell = MemoizedShell.new
  @memory_total_usage = 0
  @memory_usage = 0
  @cpu_usage = [-1, -1, -1]
end

Instance Attribute Details

#consumer_groupsObject (readonly)

Returns the value of attribute consumer_groups.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def consumer_groups
  @consumer_groups
end

#countersObject (readonly)

Returns the value of attribute counters.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def counters
  @counters
end

#errorsObject (readonly)

Returns the value of attribute errors.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def errors
  @errors
end

#jobsObject (readonly)

Returns the value of attribute jobs.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def jobs
  @jobs
end

#pausesObject (readonly)

Returns the value of attribute pauses.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def pauses
  @pauses
end

#subscription_groupsObject (readonly)

Returns the value of attribute subscription_groups.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def subscription_groups
  @subscription_groups
end

#windowsObject (readonly)

Returns the value of attribute windows.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def windows
  @windows
end

Instance Method Details

#clearObject

Note:

We do not clear processing or pauses or other things like this because we track their states and not values, so they need to be tracked between flushes.

Clears counters and errors. Used after data is reported by reported to start collecting new samples



112
113
114
115
116
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 112

def clear
  @counters.each { |k, _| @counters[k] = 0 }

  @errors.clear
end

#sampleObject

Note:

This should run before any mutex, so other threads can continue as those operations may invoke shell commands



120
121
122
123
124
125
126
127
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 120

def sample
  memory_threads_ps

  @memory_usage = memory_usage
  @memory_total_usage = memory_total_usage
  @cpu_usage = cpu_usage
  @threads = threads
end

#to_reportHash

Returns report hash with all the details about consumer operations.

Returns:

  • (Hash)

    report hash with all the details about consumer operations



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 66

def to_report
  {
    schema_version: SCHEMA_VERSION,
    type: 'consumer',
    dispatched_at: float_now,

    process: {
      started_at: started_at,
      name: process_name,
      status: ::Karafka::App.config.internal.status.to_s,
      listeners: listeners,
      workers: workers,
      memory_usage: @memory_usage,
      memory_total_usage: @memory_total_usage,
      memory_size: memory_size,
      cpus: cpus,
      threads: threads,
      cpu_usage: @cpu_usage,
      tags: Karafka::Process.tags,
      bytes_received: bytes_received,
      bytes_sent: bytes_sent
    },

    versions: {
      ruby: ruby_version,
      karafka: karafka_version,
      karafka_core: karafka_core_version,
      karafka_web: karafka_web_version,
      waterdrop: waterdrop_version,
      rdkafka: rdkafka_version,
      librdkafka: librdkafka_version
    },

    stats: jobs_queue_statistics.merge(
      utilization: utilization
    ).merge(total: @counters),

    consumer_groups: enriched_consumer_groups,
    jobs: jobs.values
  }
end

#trackObject

We cannot report and track the same time, that is why we use mutex here. To make sure that samples aggregations and counting does not interact with reporter flushing.



59
60
61
62
63
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 59

def track
  Reporter::MUTEX.synchronize do
    yield(self)
  end
end