Class: Karafka::Web::Tracking::Consumers::Sampler

Inherits:
Sampler
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/web/tracking/consumers/sampler.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/os.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/base.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/jobs.rb,
lib/karafka/web/tracking/consumers/sampler/enrichers/base.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/server.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/network.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/container.rb,
lib/karafka/web/tracking/consumers/sampler/enrichers/consumer_groups.rb

Overview

Samples for fetching and storing metrics samples about the consumer process

Defined Under Namespace

Modules: Enrichers, Metrics

Constant Summary collapse

SCHEMA_VERSION =

Current schema version This is used for detecting incompatible changes and not using outdated data during upgrades

'1.5.0'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Sampler

#karafka_core_version, #karafka_version, #karafka_web_version, #librdkafka_version, #process_id, #rdkafka_version, #ruby_version, #waterdrop_version

Constructor Details

#initializeSampler

Returns a new instance of Sampler.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 38

def initialize
  super

  @windows = Helpers::Ttls::Windows.new
  @counters = COUNTERS_BASE.dup

  @consumer_groups = Hash.new do |h, cg_id|
    h[cg_id] = {
      id: cg_id,
      subscription_groups: {}
    }
  end

  @subscription_groups = Hash.new do |h, sg_id|
    h[sg_id] = {
      id: sg_id,
      polled_at: monotonic_now,
      topics: Hash.new do |h1, topic|
        h1[topic] = Hash.new do |h2, partition|
          # We track those details in case we need to fill statistical gaps for
          # transactional consumers
          h2[partition] = {
            seek_offset: -1,
            transactional: false
          }
        end
      end
    }
  end

  @errors = []
  @pauses = {}
  @jobs = {}
  @shell = MemoizedShell.new
  @memory_total_usage = 0
  @memory_usage = 0
  @cpu_usage = [-1, -1, -1]

  # Select and instantiate appropriate system metrics collector based on environment
  # Use container-aware collector if cgroups are available, otherwise use OS-based
  metrics_class = if Metrics::Container.active?
                    Metrics::Container
                  else
                    Metrics::Os
                  end
  @system_metrics = metrics_class.new(@shell)
  @network_metrics = Metrics::Network.new(@windows)
  @server_metrics = Metrics::Server.new
end

Instance Attribute Details

#consumer_groupsObject (readonly)

Returns the value of attribute consumer_groups.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def consumer_groups
  @consumer_groups
end

#countersObject (readonly)

Returns the value of attribute counters.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def counters
  @counters
end

#errorsObject (readonly)

Returns the value of attribute errors.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def errors
  @errors
end

#jobsObject (readonly)

Returns the value of attribute jobs.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def jobs
  @jobs
end

#pausesObject (readonly)

Returns the value of attribute pauses.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def pauses
  @pauses
end

#subscription_groupsObject (readonly)

Returns the value of attribute subscription_groups.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def subscription_groups
  @subscription_groups
end

#windowsObject (readonly)

Returns the value of attribute windows.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def windows
  @windows
end

Instance Method Details

#clearObject

Note:

We do not clear processing or pauses or other things like this because we track their states and not values, so they need to be tracked between flushes.

Clears counters and errors. Used after data is reported by reported to start collecting new samples



144
145
146
147
148
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 144

def clear
  @counters.each { |k, _| @counters[k] = 0 }

  @errors.clear
end

#sampleObject

Note:

This should run before any mutex, so other threads can continue as those operations may invoke shell commands



152
153
154
155
156
157
158
159
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 152

def sample
  memory_threads_ps

  @memory_usage = memory_usage
  @memory_total_usage = memory_total_usage
  @cpu_usage = cpu_usage
  @threads = threads
end

#to_reportHash

Returns report hash with all the details about consumer operations.

Returns:

  • (Hash)

    report hash with all the details about consumer operations



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 97

def to_report
  {
    schema_version: SCHEMA_VERSION,
    type: 'consumer',
    dispatched_at: float_now,

    process: {
      id: process_id,
      started_at: started_at,
      status: ::Karafka::App.config.internal.status.to_s,
      execution_mode: ::Karafka::Server.execution_mode.to_s,
      listeners: @server_metrics.listeners,
      workers: workers,
      memory_usage: @memory_usage,
      memory_total_usage: @memory_total_usage,
      memory_size: memory_size,
      cpus: cpus,
      threads: threads,
      cpu_usage: @cpu_usage,
      tags: Karafka::Process.tags,
      bytes_received: @network_metrics.bytes_received,
      bytes_sent: @network_metrics.bytes_sent
    },

    versions: {
      ruby: ruby_version,
      karafka: karafka_version,
      karafka_core: karafka_core_version,
      karafka_web: karafka_web_version,
      waterdrop: waterdrop_version,
      rdkafka: rdkafka_version,
      librdkafka: librdkafka_version
    },

    stats: jobs_metrics.jobs_queue_statistics.merge(
      utilization: jobs_metrics.utilization
    ).merge(total: @counters),

    consumer_groups: enriched_consumer_groups,
    jobs: jobs.values
  }
end

#trackObject

We cannot report and track the same time, that is why we use mutex here. To make sure that samples aggregations and counting does not interact with reporter flushing.



90
91
92
93
94
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 90

def track
  Reporter::MUTEX.synchronize do
    yield(self)
  end
end