Class: Karafka::Web::Tracking::Consumers::Sampler
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/web/tracking/consumers/sampler.rb
Overview
Samples for fetching and storing metrics samples about the consumer process
Constant Summary collapse
- SCHEMA_VERSION =
Current schema version This is used for detecting incompatible changes and not using outdated data during upgrades
'1.4.0'
Instance Attribute Summary collapse
-
#consumer_groups ⇒ Object
readonly
Returns the value of attribute consumer_groups.
-
#counters ⇒ Object
readonly
Returns the value of attribute counters.
-
#errors ⇒ Object
readonly
Returns the value of attribute errors.
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
-
#pauses ⇒ Object
readonly
Returns the value of attribute pauses.
-
#subscription_groups ⇒ Object
readonly
Returns the value of attribute subscription_groups.
-
#windows ⇒ Object
readonly
Returns the value of attribute windows.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears counters and errors.
-
#initialize ⇒ Sampler
constructor
A new instance of Sampler.
- #sample ⇒ Object
-
#to_report ⇒ Hash
Report hash with all the details about consumer operations.
-
#track ⇒ Object
We cannot report and track the same time, that is why we use mutex here.
Methods inherited from Sampler
#karafka_core_version, #karafka_version, #karafka_web_version, #librdkafka_version, #process_id, #rdkafka_version, #ruby_version, #waterdrop_version
Constructor Details
#initialize ⇒ Sampler
Returns a new instance of Sampler.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 38 def initialize super @windows = Helpers::Ttls::Windows.new @counters = COUNTERS_BASE.dup @consumer_groups = Hash.new do |h, cg_id| h[cg_id] = { id: cg_id, subscription_groups: {} } end @subscription_groups = Hash.new do |h, sg_id| h[sg_id] = { id: sg_id, polled_at: monotonic_now } end @errors = [] @pauses = {} @jobs = {} @shell = MemoizedShell.new @memory_total_usage = 0 @memory_usage = 0 @cpu_usage = [-1, -1, -1] end |
Instance Attribute Details
#consumer_groups ⇒ Object (readonly)
Returns the value of attribute consumer_groups.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def consumer_groups @consumer_groups end |
#counters ⇒ Object (readonly)
Returns the value of attribute counters.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def counters @counters end |
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def errors @errors end |
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def jobs @jobs end |
#pauses ⇒ Object (readonly)
Returns the value of attribute pauses.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def pauses @pauses end |
#subscription_groups ⇒ Object (readonly)
Returns the value of attribute subscription_groups.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def subscription_groups @subscription_groups end |
#windows ⇒ Object (readonly)
Returns the value of attribute windows.
12 13 14 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12 def windows @windows end |
Instance Method Details
#clear ⇒ Object
We do not clear processing or pauses or other things like this because we track their states and not values, so they need to be tracked between flushes.
Clears counters and errors. Used after data is reported by reported to start collecting new samples
123 124 125 126 127 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 123 def clear @counters.each { |k, _| @counters[k] = 0 } @errors.clear end |
#sample ⇒ Object
This should run before any mutex, so other threads can continue as those operations may invoke shell commands
131 132 133 134 135 136 137 138 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 131 def sample memory_threads_ps @memory_usage = memory_usage @memory_total_usage = memory_total_usage @cpu_usage = cpu_usage @threads = threads end |
#to_report ⇒ Hash
Returns report hash with all the details about consumer operations.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 76 def to_report { schema_version: SCHEMA_VERSION, type: 'consumer', dispatched_at: float_now, process: { id: process_id, started_at: started_at, status: ::Karafka::App.config.internal.status.to_s, execution_mode: ::Karafka::Server.execution_mode.to_s, listeners: listeners, workers: workers, memory_usage: @memory_usage, memory_total_usage: @memory_total_usage, memory_size: memory_size, cpus: cpus, threads: threads, cpu_usage: @cpu_usage, tags: Karafka::Process., bytes_received: bytes_received, bytes_sent: bytes_sent }, versions: { ruby: ruby_version, karafka: karafka_version, karafka_core: karafka_core_version, karafka_web: karafka_web_version, waterdrop: waterdrop_version, rdkafka: rdkafka_version, librdkafka: librdkafka_version }, stats: jobs_queue_statistics.merge( utilization: utilization ).merge(total: @counters), consumer_groups: enriched_consumer_groups, jobs: jobs.values } end |
#track ⇒ Object
We cannot report and track the same time, that is why we use mutex here. To make sure that samples aggregations and counting does not interact with reporter flushing.
69 70 71 72 73 |
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 69 def track Reporter::MUTEX.synchronize do yield(self) end end |