Class: Karafka::Web::Tracking::Consumers::Sampler
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/web/tracking/consumers/sampler.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/os.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/base.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/jobs.rb,
lib/karafka/web/tracking/consumers/sampler/enrichers/base.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/server.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/network.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/container.rb,
lib/karafka/web/tracking/consumers/sampler/enrichers/consumer_groups.rb
Overview
Samples for fetching and storing metrics samples about the consumer process
Defined Under Namespace
Constant Summary collapse
- SCHEMA_VERSION =
Current schema version This is used for detecting incompatible changes and not using outdated data during upgrades
'1.5.0'
Instance Attribute Summary collapse
-
#consumer_groups ⇒ Object
readonly
Returns the value of attribute consumer_groups.
-
#counters ⇒ Object
readonly
Returns the value of attribute counters.
-
#errors ⇒ Object
readonly
Returns the value of attribute errors.
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
-
#pauses ⇒ Object
readonly
Returns the value of attribute pauses.
-
#subscription_groups ⇒ Object
readonly
Returns the value of attribute subscription_groups.
-
#windows ⇒ Object
readonly
Returns the value of attribute windows.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears counters and errors.
-
#initialize ⇒ Sampler
constructor
A new instance of Sampler.
-
#sample ⇒ Object
-
#to_report ⇒ Hash
Report hash with all the details about consumer operations.
-
#track ⇒ Object
We cannot report and track the same time, that is why we use mutex here.
Methods inherited from Sampler
#karafka_core_version, #karafka_version, #karafka_web_version, #librdkafka_version, #process_id, #rdkafka_version, #ruby_version, #waterdrop_version
Constructor Details
#initialize ⇒ Sampler
Returns a new instance of Sampler.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 38 def initialize super @windows = Helpers::Ttls::Windows.new @counters = COUNTERS_BASE.dup @consumer_groups = Hash.new do |h, cg_id| h[cg_id] = { id: cg_id, subscription_groups: {} } end @subscription_groups = Hash.new do |h, sg_id| h[sg_id] = { id: sg_id, polled_at: monotonic_now, topics: Hash.new do |h1, topic| h1[topic] = Hash.new do |h2, partition| # We track those details in case we need to fill statistical gaps for # transactional consumers h2[partition] = { seek_offset: -1, transactional: false } end end } end @errors = [] @pauses = {} @jobs = {} @shell = MemoizedShell.new @memory_total_usage = 0 @memory_usage = 0 @cpu_usage = [-1, -1, -1] # Select and instantiate appropriate system metrics collector based on environment # Use container-aware collector if cgroups are available, otherwise use OS-based metrics_class = if Metrics::Container.active? Metrics::Container else Metrics::Os end @system_metrics = metrics_class.new(@shell) @network_metrics = Metrics::Network.new(@windows) @server_metrics = Metrics::Server.new end |
Instance Attribute Details
#consumer_groups ⇒ Object (readonly)
Returns the value of attribute consumer_groups.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def consumer_groups @consumer_groups end |
#counters ⇒ Object (readonly)
Returns the value of attribute counters.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def counters @counters end |
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def errors @errors end |
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def jobs @jobs end |
#pauses ⇒ Object (readonly)
Returns the value of attribute pauses.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def pauses @pauses end |
#subscription_groups ⇒ Object (readonly)
Returns the value of attribute subscription_groups.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def subscription_groups @subscription_groups end |
#windows ⇒ Object (readonly)
Returns the value of attribute windows.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def windows @windows end |
Instance Method Details
#clear ⇒ Object
We do not clear processing or pauses or other things like this because we track their states and not values, so they need to be tracked between flushes.
Clears counters and errors. Used after data is reported by reported to start collecting new samples
144 145 146 147 148 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 144 def clear @counters.each { |k, _| @counters[k] = 0 } @errors.clear end |
#sample ⇒ Object
This should run before any mutex, so other threads can continue as those operations may invoke shell commands
152 153 154 155 156 157 158 159 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 152 def sample memory_threads_ps @memory_usage = memory_usage @memory_total_usage = memory_total_usage @cpu_usage = cpu_usage @threads = threads end |
#to_report ⇒ Hash
Returns report hash with all the details about consumer operations.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 97 def to_report { schema_version: SCHEMA_VERSION, type: 'consumer', dispatched_at: float_now, process: { id: process_id, started_at: started_at, status: ::Karafka::App.config.internal.status.to_s, execution_mode: ::Karafka::Server.execution_mode.to_s, listeners: @server_metrics.listeners, workers: workers, memory_usage: @memory_usage, memory_total_usage: @memory_total_usage, memory_size: memory_size, cpus: cpus, threads: threads, cpu_usage: @cpu_usage, tags: Karafka::Process., bytes_received: @network_metrics.bytes_received, bytes_sent: @network_metrics.bytes_sent }, versions: { ruby: ruby_version, karafka: karafka_version, karafka_core: karafka_core_version, karafka_web: karafka_web_version, waterdrop: waterdrop_version, rdkafka: rdkafka_version, librdkafka: librdkafka_version }, stats: jobs_metrics.jobs_queue_statistics.merge( utilization: jobs_metrics.utilization ).merge(total: @counters), consumer_groups: enriched_consumer_groups, jobs: jobs.values } end |
#track ⇒ Object
We cannot report and track the same time, that is why we use mutex here. To make sure that samples aggregations and counting does not interact with reporter flushing.
90 91 92 93 94 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 90 def track Reporter::MUTEX.synchronize do yield(self) end end |