Class: Karafka::Web::Ui::Pro::Controllers::Explorer

Inherits:
Controllers::Base show all
Includes:
Lib::Paginations
Defined in:
lib/karafka/web/ui/pro/controllers/explorer.rb

Overview

Data explorer controller

Instance Method Summary collapse

Methods inherited from Controllers::Base

#initialize

Constructor Details

This class inherits a constructor from Karafka::Web::Ui::Controllers::Base

Instance Method Details

#closest(topic_id, partition_id, time) ⇒ Object

Finds the closest offset matching the requested time and redirects to this location Note, that it redirects to closest but always younger.

Parameters:

  • topic_id (String)
  • partition_id (Integer)
  • time (Time)

    time of the message



197
198
199
200
201
202
203
204
# File 'lib/karafka/web/ui/pro/controllers/explorer.rb', line 197

def closest(topic_id, partition_id, time)
  target = Lib::Admin.read_topic(topic_id, partition_id, 1, time).first

  partition_path = "explorer/#{topic_id}/#{partition_id}"
  partition_path += "?offset=#{target.offset}" if target

  redirect(partition_path)
end

#indexObject

Lists all the topics we can explore



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/karafka/web/ui/pro/controllers/explorer.rb', line 24

def index
  @topics = Models::ClusterInfo
            .topics
            .sort_by { |topic| topic[:topic_name] }

  unless ::Karafka::Web.config.ui.visibility.internal_topics
    @topics.reject! { |topic| topic[:topic_name].start_with?('__') }
  end

  render
end

#partition(topic_id, partition_id) ⇒ Object

Shows messages available in a given partition

Parameters:

  • topic_id (String)
  • partition_id (Integer)


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/karafka/web/ui/pro/controllers/explorer.rb', line 70

def partition(topic_id, partition_id)
  @visibility_filter = ::Karafka::Web.config.ui.visibility.filter
  @topic_id = topic_id
  @partition_id = partition_id
  @watermark_offsets = Ui::Models::WatermarkOffsets.find(topic_id, partition_id)
  @partitions_count = Models::ClusterInfo.partitions_count(topic_id)

  previous_offset, @messages, next_offset = current_partition_data

  paginate(
    previous_offset,
    @params.current_offset,
    next_offset,
    # If message is an array, it means it's a compacted dummy offset representation
    @messages.map { |message| message.is_a?(Array) ? message.last : message.offset }
  )

  render
end

#recent(topic_id, partition_id) ⇒ Object

Displays the most recent message on a topic/partition

Parameters:

  • topic_id (String)
  • partition_id (Integer, nil)

    partition we’re interested in or nil if we are interested in the most recent message from all the partitions



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/karafka/web/ui/pro/controllers/explorer.rb', line 127

def recent(topic_id, partition_id)
  if partition_id
    active_partitions = [partition_id]
  else
    partitions_count = Models::ClusterInfo.partitions_count(topic_id)
    active_partitions, = Paginators::Partitions.call(partitions_count, 1)
  end

  recent = nil

  # This selects first pages with most recent messages and moves to next if first
  # contains only compacted data, etc.
  #
  # We do it until we find a message we could refer to (if doable) within first
  # ten pages
  10.times do |page|
    messages, = Models::Message.topic_page(topic_id, active_partitions, page + 1)

    # Selects newest out of all partitions
    # Reject compacted messages and transaction-related once
    recent = messages.reject { |message| message.is_a?(Array) }.max_by(&:timestamp)

    break if recent
  end

  recent || raise(::Karafka::Web::Errors::Ui::NotFoundError)

  show(topic_id, recent.partition, recent.offset, paginate: false)
end

#show(topic_id, partition_id, offset, paginate: true) ⇒ Object

Displays given message

Parameters:

  • topic_id (String)
  • partition_id (Integer)
  • offset (Integer)

    offset of the message we want to display

  • paginate (Boolean) (defaults to: true)

    do we want to have pagination



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/karafka/web/ui/pro/controllers/explorer.rb', line 96

def show(topic_id, partition_id, offset, paginate: true)
  @visibility_filter = ::Karafka::Web.config.ui.visibility.filter
  @topic_id = topic_id
  @partition_id = partition_id
  @offset = offset
  @message = Ui::Models::Message.find(@topic_id, @partition_id, @offset)
  @payload_error = false

  begin
    @pretty_payload = JSON.pretty_generate(@message.payload)
  rescue StandardError => e
    @payload_error = e
  end

  # This may be off for certain views like recent view where we are interested only
  # in the most recent all the time. It does not make any sense to display pagination
  # there
  if paginate
    # We need watermark offsets to decide if we can paginate left and right
    watermark_offsets = Ui::Models::WatermarkOffsets.find(topic_id, partition_id)
    paginate(offset, watermark_offsets.low, watermark_offsets.high)
  end

  render
end

#surrounding(topic_id, partition_id, offset) ⇒ Object

Computes a page on which the given offset is in the middle of the page (if possible) Useful often when debugging to be able to quickly jump to the historical location of message and its surrounding to understand failure

Parameters:

  • topic_id (String)
  • partition_id (Integer)
  • offset (Integer)

    offset of the message we want to display

Raises:



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/karafka/web/ui/pro/controllers/explorer.rb', line 164

def surrounding(topic_id, partition_id, offset)
  watermark_offsets = Ui::Models::WatermarkOffsets.find(topic_id, partition_id)

  raise ::Karafka::Web::Errors::Ui::NotFoundError if offset < watermark_offsets.low
  raise ::Karafka::Web::Errors::Ui::NotFoundError if offset >= watermark_offsets.high

  # Assume we start from this offset
  shift = 0
  elements = 0

  # Position the offset as close to the middle of offset based page as possible
  ::Karafka::Web.config.ui.per_page.times do
    break if elements >= ::Karafka::Web.config.ui.per_page

    elements += 1 if offset + shift < watermark_offsets.high

    if offset - shift > watermark_offsets.low
      shift += 1
      elements += 1
    end
  end

  target = offset - shift

  redirect("explorer/#{topic_id}/#{partition_id}?offset=#{target}")
end

#topic(topic_id) ⇒ Object

Note:

This view may not be 100% accurate because we merge multiple partitions data into a single view and this is never accurate. It can be used however to quickly look at most recent data flowing, etc, hence it is still useful for aggregated metrics information

Note:

We cannot use offset references here because each of the partitions may have completely different values

Displays aggregated messages from (potentially) all partitions of a topic

Parameters:

  • topic_id (String)


47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/karafka/web/ui/pro/controllers/explorer.rb', line 47

def topic(topic_id)
  @visibility_filter = ::Karafka::Web.config.ui.visibility.filter

  @topic_id = topic_id
  @partitions_count = Models::ClusterInfo.partitions_count(topic_id)

  @active_partitions, materialized_page, @limited = Paginators::Partitions.call(
    @partitions_count, @params.current_page
  )

  @messages, next_page = Models::Message.topic_page(
    topic_id, @active_partitions, materialized_page
  )

  paginate(@params.current_page, next_page)

  render
end