Class: Karafka::Web::Ui::Models::Status

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/web/ui/models/status.rb

Overview

Model that represents the general status of the Web UI.

We use this data to display a status page that helps with debugging on what is missing in the overall setup of the Web UI.

People have various problems like too many partitions, not created topics, etc. and this data and view aims to help them with understanding the current status of the setup

Defined Under Namespace

Classes: Step

Instance Method Summary collapse

Instance Method Details

#connectionStatus::Step

Some people try to work with Kafka over the internet with really high latency and this should be highlighted in the UI as often the connection just becomes unstable

Returns:

  • (Status::Step)

    were we able to connect to Kafka or not and how fast.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/karafka/web/ui/models/status.rb', line 59

def connection
  if enabled.success?
    # Do not connect more than once during the status object lifetime
    @connection_time || connect

    level = if @connection_time < 1_000
              :success
            elsif @connection_time < 1_000_000
              :warning
            else
              :failure
            end
  else
    level = :halted
  end

  Step.new(
    level,
    { time: @connection_time }
  )
end

#consumers_reportsStatus::Step

Returns could we read and operate on the current processes data (if any).

Returns:

  • (Status::Step)

    could we read and operate on the current processes data (if any)



184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/karafka/web/ui/models/status.rb', line 184

def consumers_reports
  if initial_consumers_metrics.success?
    @processes ||= Models::Processes.active(@current_state)
    status = :success
  else
    status = :halted
  end

  Step.new(status, nil)
rescue JSON::ParserError
  Step.new(:failure, nil)
end

#consumers_reports_schema_stateStatus::Step

Returns Are we able to actually digest the consumers reports with the consumer that is consuming them.

Returns:

  • (Status::Step)

    Are we able to actually digest the consumers reports with the consumer that is consuming them.



261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/karafka/web/ui/models/status.rb', line 261

def consumers_reports_schema_state
  status = if state_calculation.success?
             @current_state[:schema_state] == 'compatible' ? :success : :failure
           else
             :halted
           end

  Step.new(
    status,
    nil
  )
end

#enabledObject

Is karafka-web enabled in the karafka.rb Checks if the consumer group for web-ui is injected. It does not check if the group is active because this may depend on the configuration details, but for the Web-UI web app to work, the routing needs to be aware of the deserializer, etc



45
46
47
48
49
50
51
52
53
54
# File 'lib/karafka/web/ui/models/status.rb', line 45

def enabled
  enabled = ::Karafka::App.routes.map(&:name).include?(
    ::Karafka::Web.config.group_id
  )

  Step.new(
    enabled ? :success : :failure,
    nil
  )
end

#initial_consumers_metricsStatus::Step

Returns Is the initial consumers metrics record present in Kafka and that they can be deserialized.

Returns:

  • (Status::Step)

    Is the initial consumers metrics record present in Kafka and that they can be deserialized



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/karafka/web/ui/models/status.rb', line 162

def initial_consumers_metrics
  details = { issue_type: :presence }

  if initial_consumers_state.success?
    begin
      @current_metrics ||= Models::ConsumersMetrics.current
      status = @current_metrics ? :success : :failure
    rescue JSON::ParserError
      status = :failure
      details[:issue_type] = :deserialization
    end
  else
    status = :halted
  end

  Step.new(
    status,
    details
  )
end

#initial_consumers_stateStatus::Step

Returns Is the initial consumers state present in Kafka and that they can be deserialized.

Returns:

  • (Status::Step)

    Is the initial consumers state present in Kafka and that they can be deserialized



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/karafka/web/ui/models/status.rb', line 139

def initial_consumers_state
  details = { issue_type: :presence }

  if replication.success?
    begin
      @current_state ||= Models::ConsumersState.current
      status = @current_state ? :success : :failure
    rescue JSON::ParserError
      status = :failure
      details[:issue_type] = :deserialization
    end
  else
    status = :halted
  end

  Step.new(
    status,
    details
  )
end

#live_reportingStatus::Step

Returns Is there at least one active karafka server reporting to the Web UI.

Returns:

  • (Status::Step)

    Is there at least one active karafka server reporting to the Web UI



199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/karafka/web/ui/models/status.rb', line 199

def live_reporting
  status = if consumers_reports.success?
             @processes.empty? ? :failure : :success
           else
             :halted
           end

  Step.new(
    status,
    nil
  )
end

#materializing_lagStatus::Step

Note:

Since both states and metrics are reported together, it is enough for us to check on one of them.

Returns Is there a significant lag in the reporting of aggregated data back to the Kafka. If yes, it means that the results in the Web UI will be delayed against the reality. Often it means, that there is over-saturation on the consumer that is materializing the states.

Returns:

  • (Status::Step)

    Is there a significant lag in the reporting of aggregated data back to the Kafka. If yes, it means that the results in the Web UI will be delayed against the reality. Often it means, that there is over-saturation on the consumer that is materializing the states.



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/karafka/web/ui/models/status.rb', line 219

def materializing_lag
  max_lag = (Web.config.tracking.interval * 2) / 1_000

  details = { lag: 0, max_lag: max_lag }

  status = if live_reporting.success?
             lag = Time.now.to_f - @current_state.dispatched_at
             details[:lag] = lag

             lag > max_lag ? :failure : :success
           else
             :halted
           end

  Step.new(
    status,
    details
  )
end

#partitionsStatus::Step

Returns do we have all topics with expected number of partitions.

Returns:

  • (Status::Step)

    do we have all topics with expected number of partitions



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/karafka/web/ui/models/status.rb', line 98

def partitions
  if topics.success?
    status = :success
    status = :failure if topics_details[topics_consumers_states][:partitions] != 1
    status = :failure if topics_details[topics_consumers_reports][:partitions] != 1
    status = :failure if topics_details[topics_consumers_metrics][:partitions] != 1
    details = topics_details
  else
    status = :halted
    details = {}
  end

  Step.new(
    status,
    details
  )
end

#pro_subscriptionStatus::Step

Note:

It’s not an error not to have it but we want to warn, that some of the features may not work without Pro.

Returns is Pro enabled with all of its features.

Returns:

  • (Status::Step)

    is Pro enabled with all of its features.



299
300
301
302
303
304
# File 'lib/karafka/web/ui/models/status.rb', line 299

def pro_subscription
  Step.new(
    ::Karafka.pro? ? :success : :warning,
    nil
  )
end

#replicationStatus::Step

Returns do we have correct replication for given env.

Returns:

  • (Status::Step)

    do we have correct replication for given env



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/karafka/web/ui/models/status.rb', line 117

def replication
  if partitions.success?
    status = :success
    # low replication is not an error but just a warning and a potential problem
    # in case of a crash, this is why we do not fail but warn only
    status = :warning if topics_details.values.any? { |det| det[:replication] < 2 }
    # Allow for non-production setups to use replication 1 as it is not that relevant
    status = :success unless Karafka.env.production?
    details = topics_details
  else
    status = :halted
    details = {}
  end

  Step.new(
    status,
    details
  )
end

#routing_topics_presenceStatus::Step

Returns are there any active topics in the routing that are not present in the cluster (does not apply to patterns).

Returns:

  • (Status::Step)

    are there any active topics in the routing that are not present in the cluster (does not apply to patterns)



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/karafka/web/ui/models/status.rb', line 276

def routing_topics_presence
  if consumers_reports_schema_state.success?
    existing = @cluster_info.topics.map { |topic| topic[:topic_name] }

    missing = ::Karafka::App
              .routes
              .flat_map(&:topics)
              .flat_map { |topics| topics.map(&:itself) }
              .select(&:active?)
              .reject { |topic| topic.respond_to?(:patterns?) ? topic.patterns? : false }
              .map(&:name)
              .uniq
              .then { |routed_topics| routed_topics - existing }

    Step.new(missing.empty? ? :success : :warning, missing)
  else
    Step.new(:halted, [])
  end
end

#state_calculationStatus::Step

Returns is there a subscription to our reports topic that is being consumed actively.

Returns:

  • (Status::Step)

    is there a subscription to our reports topic that is being consumed actively.



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/karafka/web/ui/models/status.rb', line 241

def state_calculation
  if materializing_lag.success?
    @subscriptions ||= Models::Health
                       .current(@current_state)
                       .values.map { |consumer_group| consumer_group[:topics] }
                       .flat_map(&:keys)

    status = @subscriptions.include?(topics_consumers_reports) ? :success : :failure
  else
    status = :halted
  end

  Step.new(
    status,
    nil
  )
end

#topicsStatus::Step

Returns do all the needed topics exist.

Returns:



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/karafka/web/ui/models/status.rb', line 82

def topics
  if connection.success?
    details = topics_details
    status = details.all? { |_, detail| detail[:present] } ? :success : :failure
  else
    status = :halted
    details = {}
  end

  Step.new(
    status,
    details
  )
end