Class: Karafka::Web::Pro::Ui::Controllers::ExplorerController

Inherits:
BaseController show all
Defined in:
lib/karafka/web/pro/ui/controllers/explorer_controller.rb

Overview

Data explorer controller

Constant Summary

Constants inherited from Ui::Controllers::BaseController

Ui::Controllers::BaseController::Models

Instance Method Summary collapse

Methods inherited from Ui::Controllers::BaseController

#initialize

Constructor Details

This class inherits a constructor from Karafka::Web::Ui::Controllers::BaseController

Instance Method Details

#closest(topic_id, partition_id, time) ⇒ Object

Finds the closest offset matching the requested time and redirects to this location Note, that it redirects to closest but always younger.

Parameters:

  • topic_id (String)
  • partition_id (Integer)
  • time (Time)

    time of the message



194
195
196
197
198
199
200
201
# File 'lib/karafka/web/pro/ui/controllers/explorer_controller.rb', line 194

def closest(topic_id, partition_id, time)
  target = Web::Ui::Lib::Admin.read_topic(topic_id, partition_id, 1, time).first

  partition_path = "explorer/#{topic_id}/#{partition_id}"
  partition_path += "?offset=#{target.offset}" if target

  redirect(partition_path)
end

#indexObject

Lists all the topics we can explore



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/karafka/web/pro/ui/controllers/explorer_controller.rb', line 22

def index
  @topics = Models::ClusterInfo
            .topics
            .sort_by { |topic| topic[:topic_name] }

  unless ::Karafka::Web.config.ui.visibility.internal_topics
    @topics.reject! { |topic| topic[:topic_name].start_with?('__') }
  end

  render
end

#partition(topic_id, partition_id) ⇒ Object

Shows messages available in a given partition

Parameters:

  • topic_id (String)
  • partition_id (Integer)


68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/karafka/web/pro/ui/controllers/explorer_controller.rb', line 68

def partition(topic_id, partition_id)
  @visibility_filter = ::Karafka::Web.config.ui.policies.messages
  @topic_id = topic_id
  @partition_id = partition_id
  @watermark_offsets = Models::WatermarkOffsets.find(topic_id, partition_id)
  @partitions_count = Models::ClusterInfo.partitions_count(topic_id)

  previous_offset, @messages, next_offset = current_partition_data

  paginate(
    previous_offset,
    @params.current_offset,
    next_offset,
    # If message is an array, it means it's a compacted dummy offset representation
    @messages.map { |message| message.is_a?(Array) ? message.last : message.offset }
  )

  render
end

#recent(topic_id, partition_id) ⇒ Object

Displays the most recent message on a topic/partition

Parameters:

  • topic_id (String)
  • partition_id (Integer, nil)

    partition we’re interested in or nil if we are interested in the most recent message from all the partitions



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/karafka/web/pro/ui/controllers/explorer_controller.rb', line 124

def recent(topic_id, partition_id)
  if partition_id
    active_partitions = [partition_id]
  else
    partitions_count = Models::ClusterInfo.partitions_count(topic_id)
    active_partitions, = Paginators::Partitions.call(partitions_count, 1)
  end

  recent = nil

  # This selects first pages with most recent messages and moves to next if first
  # contains only compacted data, etc.
  #
  # We do it until we find a message we could refer to (if doable) within first
  # ten pages
  10.times do |page|
    messages, = Models::Message.topic_page(topic_id, active_partitions, page + 1)

    # Selects newest out of all partitions
    # Reject compacted messages and transaction-related once
    recent = messages.reject { |message| message.is_a?(Array) }.max_by(&:timestamp)

    break if recent
  end

  recent || not_found!

  show(topic_id, recent.partition, recent.offset, paginate: false)
end

#show(topic_id, partition_id, offset, paginate: true) ⇒ Object

Displays given message

Parameters:

  • topic_id (String)
  • partition_id (Integer)
  • offset (Integer)

    offset of the message we want to display

  • paginate (Boolean) (defaults to: true)

    do we want to have pagination



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/karafka/web/pro/ui/controllers/explorer_controller.rb', line 94

def show(topic_id, partition_id, offset, paginate: true)
  Lib::PatternsDetector.new.call

  @visibility_filter = ::Karafka::Web.config.ui.policies.messages
  @topic_id = topic_id
  @partition_id = partition_id
  @offset = offset
  @message = Models::Message.find(@topic_id, @partition_id, @offset)

  @safe_key = Web::Pro::Ui::Lib::SafeRunner.new { @message.key }.tap(&:call)
  @safe_headers = Web::Pro::Ui::Lib::SafeRunner.new { @message.headers }.tap(&:call)
  @safe_payload = Web::Pro::Ui::Lib::SafeRunner.new { @message.payload }.tap(&:call)

  # This may be off for certain views like recent view where we are interested only
  # in the most recent all the time. It does not make any sense to display pagination
  # there
  if paginate
    # We need watermark offsets to decide if we can paginate left and right
    watermark_offsets = Models::WatermarkOffsets.find(topic_id, partition_id)
    paginate(offset, watermark_offsets.low, watermark_offsets.high)
  end

  render
end

#surrounding(topic_id, partition_id, offset) ⇒ Object

Computes a page on which the given offset is in the middle of the page (if possible) Useful often when debugging to be able to quickly jump to the historical location of message and its surrounding to understand failure

Parameters:

  • topic_id (String)
  • partition_id (Integer)
  • offset (Integer)

    offset of the message we want to display



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/karafka/web/pro/ui/controllers/explorer_controller.rb', line 161

def surrounding(topic_id, partition_id, offset)
  watermark_offsets = Models::WatermarkOffsets.find(topic_id, partition_id)

  not_found! if offset < watermark_offsets.low
  not_found! if offset >= watermark_offsets.high

  # Assume we start from this offset
  shift = 0
  elements = 0

  # Position the offset as close to the middle of offset based page as possible
  ::Karafka::Web.config.ui.per_page.times do
    break if elements >= ::Karafka::Web.config.ui.per_page

    elements += 1 if offset + shift < watermark_offsets.high

    if offset - shift > watermark_offsets.low
      shift += 1
      elements += 1
    end
  end

  target = offset - shift

  redirect("explorer/#{topic_id}/#{partition_id}?offset=#{target}")
end

#topic(topic_id) ⇒ Object

Note:

This view may not be 100% accurate because we merge multiple partitions data into a single view and this is never accurate. It can be used however to quickly look at most recent data flowing, etc, hence it is still useful for aggregated metrics information

Note:

We cannot use offset references here because each of the partitions may have completely different values

Displays aggregated messages from (potentially) all partitions of a topic

Parameters:

  • topic_id (String)


45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/karafka/web/pro/ui/controllers/explorer_controller.rb', line 45

def topic(topic_id)
  @visibility_filter = ::Karafka::Web.config.ui.policies.messages

  @topic_id = topic_id
  @partitions_count = Models::ClusterInfo.partitions_count(topic_id)

  @active_partitions, materialized_page, @limited = Paginators::Partitions.call(
    @partitions_count, @params.current_page
  )

  @messages, next_page = Models::Message.topic_page(
    topic_id, @active_partitions, materialized_page
  )

  paginate(@params.current_page, next_page)

  render
end