Class: Karafka::Web::Ui::Models::Metrics::Charts::Topics

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/web/ui/models/metrics/charts/topics.rb

Overview

Model for preparing data about topics states

Instance Method Summary collapse

Constructor Details

#initialize(topics_data, period) ⇒ Topics

Returns a new instance of Topics.

Parameters:

  • topics_data (Hash)

    topics aggregated metrics data

  • period (Symbol)

    period that we are interested in



13
14
15
# File 'lib/karafka/web/ui/models/metrics/charts/topics.rb', line 13

def initialize(topics_data, period)
  @data = topics_data.to_h.fetch(period)
end

Instance Method Details

#lags_hybridString

Returns JSON with lags of each of the topics + total lag of all the topics from all the consumer groups.

Returns:

  • (String)

    JSON with lags of each of the topics + total lag of all the topics from all the consumer groups.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/karafka/web/ui/models/metrics/charts/topics.rb', line 19

def lags_hybrid
  total = Hash.new { |h, v| h[v] = 0 }

  @data.to_h.each_value do |metrics|
    metrics.each do |metric|
      time = metric.first
      lag_hybrid = metric.last[:lag_hybrid]

      if lag_hybrid
        total[time] ||= 0
        total[time] += lag_hybrid
      else
        next if total.key?(time)

        total[time] = nil
      end
    end
  end

  # Extract the lag stored only from all the data
  per_topic = @data.to_h.map do |topic, metrics|
    extracted = metrics.map { |metric| [metric.first, metric.last[:lag_hybrid]] }

    [topic, extracted]
  end.to_h

  # We name it with a space because someone may have a topic called "total" and we
  # want to avoid collisions
  per_topic.merge('total sum' => total.to_a).to_json
end

#max_lso_timeString

Returns JSON with per-topic, highest LSO freeze duration. Useful for debugging of issues arising from hanging transactions.

Returns:

  • (String)

    JSON with per-topic, highest LSO freeze duration. Useful for debugging of issues arising from hanging transactions



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/karafka/web/ui/models/metrics/charts/topics.rb', line 72

def max_lso_time
  topics = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = [] } }

  @data.to_h.each do |topic, metrics|
    topic_without_cg = topic.split('[').first

    metrics.each do |current|
      ls_offset_fd = current.last[:ls_offset_fd] || 0

      # We convert this to seconds from milliseconds due to our Web UI precision
      # Reporting is in ms for consistency
      normalized_fd = (ls_offset_fd / 1_000.0).round

      topics[topic_without_cg][current.first] << normalized_fd
    end
  end

  topics.each_value(&:compact!)
  topics.each_value { |metrics| metrics.transform_values!(&:max) }
  topics.transform_values! { |values| values.to_a.sort_by!(&:first) }
  topics.to_json
end

#topics_paceString

Returns JSON with producers pace that represents high-watermarks sum for each topic.

Returns:

  • (String)

    JSON with producers pace that represents high-watermarks sum for each topic



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/karafka/web/ui/models/metrics/charts/topics.rb', line 52

def topics_pace
  topics = {}

  @data.to_h.each do |topic, metrics|
    topic_without_cg = topic.split('[').first

    # If we've already seen this topic data, we can skip
    next if topics.include?(topic_without_cg)

    topics[topic_without_cg] = metrics.map do |current|
      [current.first, current.last[:pace]]
    end
  end

  topics.each_value(&:compact!)
  topics.to_json
end