Class: Karafka::Web::Ui::Models::Metrics::Charts::Topics
- Inherits:
-
Object
- Object
- Karafka::Web::Ui::Models::Metrics::Charts::Topics
- Defined in:
- lib/karafka/web/ui/models/metrics/charts/topics.rb
Overview
Model for preparing data about topics states
Instance Method Summary collapse
-
#initialize(topics_data, period) ⇒ Topics
constructor
A new instance of Topics.
-
#lags_hybrid ⇒ String
JSON with lags of each of the topics + total lag of all the topics from all the consumer groups.
-
#max_lso_time ⇒ String
JSON with per-topic, highest LSO freeze duration.
-
#topics_pace ⇒ String
JSON with producers pace that represents high-watermarks sum for each topic.
Constructor Details
#initialize(topics_data, period) ⇒ Topics
Returns a new instance of Topics.
13 14 15 |
# File 'lib/karafka/web/ui/models/metrics/charts/topics.rb', line 13 def initialize(topics_data, period) @data = topics_data.to_h.fetch(period) end |
Instance Method Details
#lags_hybrid ⇒ String
Returns JSON with lags of each of the topics + total lag of all the topics from all the consumer groups.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/karafka/web/ui/models/metrics/charts/topics.rb', line 19 def lags_hybrid total = Hash.new { |h, v| h[v] = 0 } @data.to_h.each_value do |metrics| metrics.each do |metric| time = metric.first lag_hybrid = metric.last[:lag_hybrid] if lag_hybrid total[time] ||= 0 total[time] += lag_hybrid else next if total.key?(time) total[time] = nil end end end # Extract the lag stored only from all the data per_topic = @data.to_h.map do |topic, metrics| extracted = metrics.map { |metric| [metric.first, metric.last[:lag_hybrid]] } [topic, extracted] end.to_h # We name it with a space because someone may have a topic called "total" and we # want to avoid collisions per_topic.merge('total sum' => total.to_a).to_json end |
#max_lso_time ⇒ String
Returns JSON with per-topic, highest LSO freeze duration. Useful for debugging of issues arising from hanging transactions.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/karafka/web/ui/models/metrics/charts/topics.rb', line 72 def max_lso_time topics = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = [] } } @data.to_h.each do |topic, metrics| topic_without_cg = topic.split('[').first metrics.each do |current| ls_offset_fd = current.last[:ls_offset_fd] || 0 # We convert this to seconds from milliseconds due to our Web UI precision # Reporting is in ms for consistency normalized_fd = (ls_offset_fd / 1_000.0).round topics[topic_without_cg][current.first] << normalized_fd end end topics.each_value(&:compact!) topics.each_value { |metrics| metrics.transform_values!(&:max) } topics.transform_values! { |values| values.to_a.sort_by!(&:first) } topics.to_json end |
#topics_pace ⇒ String
Returns JSON with producers pace that represents high-watermarks sum for each topic.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/karafka/web/ui/models/metrics/charts/topics.rb', line 52 def topics_pace topics = {} @data.to_h.each do |topic, metrics| topic_without_cg = topic.split('[').first # If we've already seen this topic data, we can skip next if topics.include?(topic_without_cg) topics[topic_without_cg] = metrics.map do |current| [current.first, current.last[:pace]] end end topics.each_value(&:compact!) topics.to_json end |