Class: Karafka::Web::Ui::Models::Message

Inherits:
Object
  • Object
show all
Extended by:
Lib::Paginations::Paginators
Defined in:
lib/karafka/web/ui/models/message.rb

Overview

A proxy between ::Karafka::Messages::Message and web UI We work with the Karafka messages but use this model to wrap the work needed.

Class Method Summary collapse

Class Method Details

.find(topic_id, partition_id, offset) ⇒ Object

Looks for a message from a given topic partition

Parameters:

  • topic_id (String)
  • partition_id (Integer)
  • offset (Integer)

Raises:



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/karafka/web/ui/models/message.rb', line 19

def find(topic_id, partition_id, offset)
  message = Lib::Admin.read_topic(
    topic_id,
    partition_id,
    1,
    offset
  ).first

  return message if message

  raise(
    ::Karafka::Web::Errors::Ui::NotFoundError,
    [topic_id, partition_id, offset].join(', ')
  )
end

.offset_page(topic_id, partition_id, start_offset, watermark_offsets) ⇒ Array

Fetches requested page_count number of Kafka messages starting from the oldest requested start_offset. If start_offset is -1, will fetch the most recent results

Parameters:

  • topic_id (String)
  • partition_id (Integer)
  • start_offset (Integer)

    oldest offset from which we want to get the data

  • watermark_offsets (Ui::Models::WatermarkOffsets)

    watermark offsets

Returns:

  • (Array)

    We return page data as well as all the details needed to build the pagination details.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/karafka/web/ui/models/message.rb', line 45

def offset_page(topic_id, partition_id, start_offset, watermark_offsets)
  low_offset = watermark_offsets.low
  high_offset = watermark_offsets.high

  # If we start from offset -1, it means we want first page with the most recent
  # results. We obtain this page by using the offset based on the high watermark
  # off
  start_offset = high_offset - per_page if start_offset == -1

  # No previous pages, no data, and no more offsets
  no_data_result = [false, [], false]

  # If there is no data, we return the no results result
  return no_data_result if low_offset == high_offset

  if start_offset <= low_offset
    # If this page does not contain max per page, compute how many messages we can
    # fetch before stopping
    count = per_page - (low_offset - start_offset)
    next_offset = false
    start_offset = low_offset
  else
    next_offset = start_offset - per_page
    # Do not go below the lowest possible offset
    next_offset = low_offset if next_offset < low_offset
    count = high_offset - start_offset
    # If there would be more messages that we want to get, force max
    count = per_page if count > per_page
  end

  # This code is a bit tricky. Since topics can be compacted and certain offsets may
  # not be present at all, it may happen that we want to read from a non-existing
  # offset. In case like this we need to catch this error (we do it in `read_topic`)
  # and we need to move to an offset closer to high offset. This is not fast but it is
  # an edge case that should not happen often when inspecting real time data. This can
  # happen more often for heavily compacted topics with short retention but even then
  # it is ok for 25 elements we usually operate on a single page.
  count.times do |index|
    context_offset = start_offset + index
    # We need to get less if we move up with missing offsets to get exactly
    # the number we needed
    context_count = count - index

    messages = read_topic(
      topic_id,
      partition_id,
      context_count,
      context_offset,
      # We do not reset the offset here because we are not interested in seeking from
      # any offset. We are interested in the indication, that there is no offset of a
      # given value so we can try with a more recent one
      'auto.offset.reset': 'error'
    )

    next unless messages

    previous_offset = start_offset + count

    if previous_offset >= high_offset
      previous_offset = false
    elsif previous_offset + (per_page - 1) > high_offset
      previous_offset = high_offset - per_page
    else
      previous_offset
    end

    return [
      previous_offset,
      fill_compacted(messages, partition_id, context_offset, context_count, high_offset).reverse,
      next_offset
    ]
  end

  no_data_result
end

.topic_page(topic_id, partitions_ids, page) ⇒ Object

Fetches requested page_count number of Kafka messages from the topic partitions and merges the results. Ensures, that pagination works as expected.

Parameters:

  • topic_id (String)
  • partitions_ids (Array<Integer>)

    for which of the partitions we want to get the data. This is a limiting factor because of the fact that we have to query the watermark offsets independently

  • page (Integer)

    which page we want to get



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/karafka/web/ui/models/message.rb', line 129

def topic_page(topic_id, partitions_ids, page)
  # This is the bottleneck, for each partition we make one request :(
  offsets = partitions_ids.map do |partition_id|
    [partition_id, Models::WatermarkOffsets.find(topic_id, partition_id)]
  end.to_h

  # Count number of elements we have in each partition
  # This assumes linear presence until low. If not, gaps will be filled like we fill
  # for per partition view
  counts = offsets.values.map { |offset| offset[:high] - offset[:low] }

  # Establish initial offsets for the iterator (where to start) per partition
  # We do not use the negative lookup iterator because we already can compute starting
  # offsets. This saves a lot of calls to Kafka
  ranges = Sets.call(counts, page).map do |partition_position, partition_range|
    partition_id = partitions_ids.to_a[partition_position]
    watermarks = offsets[partition_id]

    lowest = watermarks[:high] - partition_range.last - 1
    # We -1 because high watermark offset is the next incoming offset and not the last
    # one in the topic partition
    highest = watermarks[:high] - partition_range.first - 1

    # This range represents offsets we want to fetch
    [partition_id, lowest..highest]
  end.to_h

  # We start on our topic from the lowest offset for each expected partition
  iterator = Karafka::Pro::Iterator.new(
    { topic_id => ranges.transform_values(&:first) }
  )

  # Build the aggregated representation for each partition messages, so we can start
  # with assumption that all the topics are fully compacted. Then we can nicely replace
  # compacted `false` data with real messages, effectively ensuring that the gaps are
  # filled with `false` out-of-the-box
  aggregated = Hash.new { |h, k| h[k] = {} }

  # We initialize the hash so we have a constant ascending order based on the partition
  # number
  partitions_ids.each { |i| aggregated[i] }

  # We prefill all the potential offsets for each partition, so in case they were
  # compacted, we get a continuous flow
  ranges.each do |partition, range|
    partition_aggr = aggregated[partition]
    range.each { |i| partition_aggr[i] = [partition, i] }
  end

  # Iterate over all partitions and collect data
  iterator.each do |message|
    range = ranges[message.partition]

    # Do not fetch more data from a partition for which we got last message from the
    # expected offsets
    # When all partitions are stopped, we will stop operations. This drastically
    # improves performance because we no longer have to poll nils
    iterator.stop_current_partition if message.offset >= range.last

    partition = aggregated[message.partition]
    partition[message.offset] = message
  end

  [
    aggregated.values.map(&:values).map(&:reverse).reduce(:+),
    !Sets.call(counts, page + 1).empty?
  ]
end