Class: Karafka::Web::Tracking::Consumers::Sampler

Inherits:
Sampler
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/web/tracking/consumers/sampler.rb

Overview

Samples for fetching and storing metrics samples about the consumer process

Constant Summary collapse

SCHEMA_VERSION =

Current schema version This is used for detecting incompatible changes and not using outdated data during upgrades

'1.4.0'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Sampler

#karafka_core_version, #karafka_version, #karafka_web_version, #librdkafka_version, #process_id, #rdkafka_version, #ruby_version, #waterdrop_version

Constructor Details

#initializeSampler

Returns a new instance of Sampler.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 38

def initialize
  super

  @windows = Helpers::Ttls::Windows.new
  @counters = COUNTERS_BASE.dup

  @consumer_groups = Hash.new do |h, cg_id|
    h[cg_id] = {
      id: cg_id,
      subscription_groups: {}
    }
  end

  @subscription_groups = Hash.new do |h, sg_id|
    h[sg_id] = {
      id: sg_id,
      polled_at: monotonic_now
    }
  end

  @errors = []
  @pauses = {}
  @jobs = {}
  @shell = MemoizedShell.new
  @memory_total_usage = 0
  @memory_usage = 0
  @cpu_usage = [-1, -1, -1]
end

Instance Attribute Details

#consumer_groupsObject (readonly)

Returns the value of attribute consumer_groups.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def consumer_groups
  @consumer_groups
end

#countersObject (readonly)

Returns the value of attribute counters.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def counters
  @counters
end

#errorsObject (readonly)

Returns the value of attribute errors.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def errors
  @errors
end

#jobsObject (readonly)

Returns the value of attribute jobs.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def jobs
  @jobs
end

#pausesObject (readonly)

Returns the value of attribute pauses.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def pauses
  @pauses
end

#subscription_groupsObject (readonly)

Returns the value of attribute subscription_groups.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def subscription_groups
  @subscription_groups
end

#windowsObject (readonly)

Returns the value of attribute windows.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def windows
  @windows
end

Instance Method Details

#clearObject

Note:

We do not clear processing or pauses or other things like this because we track their states and not values, so they need to be tracked between flushes.

Clears counters and errors. Used after data is reported by reported to start collecting new samples



123
124
125
126
127
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 123

def clear
  @counters.each { |k, _| @counters[k] = 0 }

  @errors.clear
end

#sampleObject

Note:

This should run before any mutex, so other threads can continue as those operations may invoke shell commands



131
132
133
134
135
136
137
138
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 131

def sample
  memory_threads_ps

  @memory_usage = memory_usage
  @memory_total_usage = memory_total_usage
  @cpu_usage = cpu_usage
  @threads = threads
end

#to_reportHash

Returns report hash with all the details about consumer operations.

Returns:

  • (Hash)

    report hash with all the details about consumer operations



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 76

def to_report
  {
    schema_version: SCHEMA_VERSION,
    type: 'consumer',
    dispatched_at: float_now,

    process: {
      id: process_id,
      started_at: started_at,
      status: ::Karafka::App.config.internal.status.to_s,
      execution_mode: ::Karafka::Server.execution_mode.to_s,
      listeners: listeners,
      workers: workers,
      memory_usage: @memory_usage,
      memory_total_usage: @memory_total_usage,
      memory_size: memory_size,
      cpus: cpus,
      threads: threads,
      cpu_usage: @cpu_usage,
      tags: Karafka::Process.tags,
      bytes_received: bytes_received,
      bytes_sent: bytes_sent
    },

    versions: {
      ruby: ruby_version,
      karafka: karafka_version,
      karafka_core: karafka_core_version,
      karafka_web: karafka_web_version,
      waterdrop: waterdrop_version,
      rdkafka: rdkafka_version,
      librdkafka: librdkafka_version
    },

    stats: jobs_queue_statistics.merge(
      utilization: utilization
    ).merge(total: @counters),

    consumer_groups: enriched_consumer_groups,
    jobs: jobs.values
  }
end

#trackObject

We cannot report and track the same time, that is why we use mutex here. To make sure that samples aggregations and counting does not interact with reporter flushing.



69
70
71
72
73
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 69

def track
  Reporter::MUTEX.synchronize do
    yield(self)
  end
end