Class: Karafka::Web::Ui::Models::Metrics::Charts::Topics
- Inherits:
-
Object
- Object
- Karafka::Web::Ui::Models::Metrics::Charts::Topics
- Defined in:
- lib/karafka/web/ui/models/metrics/charts/topics.rb
Overview
Model for preparing data about topics states
Instance Method Summary collapse
-
#initialize(topics_data, period) ⇒ Topics
constructor
A new instance of Topics.
-
#lags_hybrid ⇒ String
JSON with lags of each of the topics + total lag of all the topics from all the consumer groups.
-
#max_lso ⇒ String
JSON with per-topic, highest LSO freeze duration.
-
#pace ⇒ String
JSON with producers pace that represents high-watermarks sum for each topic.
Constructor Details
#initialize(topics_data, period) ⇒ Topics
Returns a new instance of Topics.
13 14 15 |
# File 'lib/karafka/web/ui/models/metrics/charts/topics.rb', line 13 def initialize(topics_data, period) @data = topics_data.to_h.fetch(period) end |
Instance Method Details
#lags_hybrid ⇒ String
Returns JSON with lags of each of the topics + total lag of all the topics from all the consumer groups.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/karafka/web/ui/models/metrics/charts/topics.rb', line 19 def lags_hybrid total = Hash.new { |h, v| h[v] = 0 } @data.to_h.each_value do |metrics| metrics.each do |metric| time = metric.first lag_hybrid = metric.last[:lag_hybrid] if lag_hybrid total[time] ||= 0 total[time] += lag_hybrid else next if total.key?(time) total[time] = nil end end end # Extract the lag stored only from all the data per_topic = @data.to_h.map do |topic, metrics| extracted = metrics.map { |metric| [metric.first, metric.last[:lag_hybrid]] } [topic, extracted] end.to_h # We name it with a space because someone may have a topic called "total" and we # want to avoid collisions per_topic.merge('total sum' => total.to_a).to_json end |
#max_lso ⇒ String
Returns JSON with per-topic, highest LSO freeze duration. Useful for debugging of issues arising from hanging transactions.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/karafka/web/ui/models/metrics/charts/topics.rb', line 52 def max_lso topics = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = [] } } @data.to_h.each do |topic, metrics| topic_without_cg = topic.split('[').first metrics.each do |current| ls_offset_fd = current.last[:ls_offset_fd] || 0 # We convert this to seconds from milliseconds due to our Web UI precision # Reporting is in ms for consistency normalized_fd = (ls_offset_fd / 1_000.0).round topics[topic_without_cg][current.first] << normalized_fd end end topics.each_value(&:compact!) topics.each_value { |metrics| metrics.transform_values!(&:max) } topics.transform_values! { |values| values.to_a.sort_by!(&:first) } topics.to_json end |
#pace ⇒ String
There is a case where data reported (sum on a topic) is lower then the previous value. This can happen around rebalances because consumer may not have all watermark offsets reported. This may cause consumers not to report some of the partitions, effectively lowering the sum. Since high-watermark offsets can only move forward, we compensate this by assuming that a lower value than previous is an artefact of that type and we replace it with the max value we had effectively compensating for under-reporting
Returns JSON with producers pace that represents high-watermarks sum for each topic.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/karafka/web/ui/models/metrics/charts/topics.rb', line 85 def pace topics = {} @data.to_h.each do |topic, metrics| topic_without_cg = topic.split('[').first # If we've already seen this topic data, we can skip next if topics.include?(topic_without_cg) max_pace = 0 topics[topic_without_cg] = metrics.map do |current| # Pace may be empty when for a given moment in time we got no info on # one of the topics. In such case we can compensate with max or 0 current_pace = current.last[:pace] || 0 max_pace = current_pace if current_pace > max_pace [current.first, max_pace] end end topics.each_value(&:compact!) topics.to_json end |