Class: Karafka::Web::Ui::Models::Message
- Inherits:
-
Object
- Object
- Karafka::Web::Ui::Models::Message
- Extended by:
- Lib::Paginations::Paginators
- Defined in:
- lib/karafka/web/ui/models/message.rb
Overview
A proxy between ::Karafka::Messages::Message
and web UI We work with the Karafka messages but use this model to wrap the work needed.
Class Method Summary collapse
-
.find(topic_id, partition_id, offset, watermark_offsets: false) ⇒ Karafka::Messages::Message?
Looks for a message from a given topic partition.
-
.offset_page(topic_id, partition_id, start_offset, watermark_offsets) ⇒ Array
Fetches requested
page_count
number of Kafka messages starting from the oldest requestedstart_offset
. -
.topic_page(topic_id, partitions_ids, page) ⇒ Object
Fetches requested
page_count
number of Kafka messages from the topic partitions and merges the results.
Class Method Details
.find(topic_id, partition_id, offset, watermark_offsets: false) ⇒ Karafka::Messages::Message?
If no watermark offsets provided will always raise if no message with data
Looks for a message from a given topic partition. When no offsets provided, will raise if there is no data under the given offset. If watermarks were provided, it will check if this is a system entry and in such cases will return nil. Will always raise if request is out of range.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/karafka/web/ui/models/message.rb', line 27 def find(topic_id, partition_id, offset, watermark_offsets: false) = Lib::Admin.read_topic( topic_id, partition_id, 1, offset ).first return if # Not found can also occur for system entries and compacted messages. # Since we want to know about this in some cases we handle this case and check if the # requested offset is within the range and if so, it means it has been cleaned or # is a system entry. In such cases we do display user an info message. return nil if watermark_offsets && offset >= watermark_offsets.low && offset < watermark_offsets.high # If beyond the watermark offsets, we raise 404 as user should not reach such # non-existent messages as we cannot reason about them raise( ::Karafka::Web::Errors::Ui::NotFoundError, [topic_id, partition_id, offset].join(', ') ) end |
.offset_page(topic_id, partition_id, start_offset, watermark_offsets) ⇒ Array
Fetches requested page_count
number of Kafka messages starting from the oldest requested start_offset
. If start_offset
is -1
, will fetch the most recent results
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/karafka/web/ui/models/message.rb', line 63 def offset_page(topic_id, partition_id, start_offset, watermark_offsets) low_offset = watermark_offsets.low high_offset = watermark_offsets.high # If we start from offset -1, it means we want first page with the most recent # results. We obtain this page by using the offset based on the high watermark # off start_offset = high_offset - per_page if start_offset == -1 # No previous pages, no data, and no more offsets no_data_result = [false, [], false] # If there is no data, we return the no results result return no_data_result if low_offset == high_offset if start_offset <= low_offset # If this page does not contain max per page, compute how many messages we can # fetch before stopping count = per_page - (low_offset - start_offset) next_offset = false start_offset = low_offset else next_offset = start_offset - per_page # Do not go below the lowest possible offset next_offset = low_offset if next_offset < low_offset count = high_offset - start_offset # If there would be more messages that we want to get, force max count = per_page if count > per_page end # This code is a bit tricky. Since topics can be compacted and certain offsets may # not be present at all, it may happen that we want to read from a non-existing # offset. In case like this we need to catch this error (we do it in `read_topic`) # and we need to move to an offset closer to high offset. This is not fast but it is # an edge case that should not happen often when inspecting real time data. This can # happen more often for heavily compacted topics with short retention but even then # it is ok for 25 elements we usually operate on a single page. count.times do |index| context_offset = start_offset + index # We need to get less if we move up with missing offsets to get exactly # the number we needed context_count = count - index = read_topic( topic_id, partition_id, context_count, context_offset, # We do not reset the offset here because we are not interested in seeking from # any offset. We are interested in the indication, that there is no offset of a # given value so we can try with a more recent one 'auto.offset.reset': 'error' ) next unless previous_offset = start_offset + count if previous_offset >= high_offset previous_offset = false elsif previous_offset + (per_page - 1) > high_offset previous_offset = high_offset - per_page else previous_offset end return [ previous_offset, fill_compacted(, partition_id, context_offset, context_count, high_offset).reverse, next_offset ] end no_data_result end |
.topic_page(topic_id, partitions_ids, page) ⇒ Object
Fetches requested page_count
number of Kafka messages from the topic partitions and merges the results. Ensures, that pagination works as expected.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/karafka/web/ui/models/message.rb', line 147 def topic_page(topic_id, partitions_ids, page) # This is the bottleneck, for each partition we make one request :( offsets = partitions_ids.map do |partition_id| [partition_id, Models::WatermarkOffsets.find(topic_id, partition_id)] end.to_h # Count number of elements we have in each partition # This assumes linear presence until low. If not, gaps will be filled like we fill # for per partition view counts = offsets.values.map { |offset| offset[:high] - offset[:low] } # Establish initial offsets for the iterator (where to start) per partition # We do not use the negative lookup iterator because we already can compute starting # offsets. This saves a lot of calls to Kafka ranges = Sets.call(counts, page).map do |partition_position, partition_range| partition_id = partitions_ids.to_a[partition_position] watermarks = offsets[partition_id] lowest = watermarks[:high] - partition_range.last - 1 # We -1 because high watermark offset is the next incoming offset and not the last # one in the topic partition highest = watermarks[:high] - partition_range.first - 1 # This range represents offsets we want to fetch [partition_id, lowest..highest] end.to_h # We start on our topic from the lowest offset for each expected partition iterator = Karafka::Pro::Iterator.new( { topic_id => ranges.transform_values(&:first) } ) # Build the aggregated representation for each partition messages, so we can start # with assumption that all the topics are fully compacted. Then we can nicely replace # compacted `false` data with real messages, effectively ensuring that the gaps are # filled with `false` out-of-the-box aggregated = Hash.new { |h, k| h[k] = {} } # We initialize the hash so we have a constant ascending order based on the partition # number partitions_ids.each { |i| aggregated[i] } # We prefill all the potential offsets for each partition, so in case they were # compacted, we get a continuous flow ranges.each do |partition, range| partition_aggr = aggregated[partition] range.each { |i| partition_aggr[i] = [partition, i] } end # Iterate over all partitions and collect data iterator.each do || range = ranges[.partition] # Do not fetch more data from a partition for which we got last message from the # expected offsets # When all partitions are stopped, we will stop operations. This drastically # improves performance because we no longer have to poll nils iterator.stop_current_partition if .offset >= range.last partition = aggregated[.partition] partition[.offset] = end [ aggregated.values.map(&:values).map(&:reverse).reduce(:+), !Sets.call(counts, page + 1).empty? ] end |