Class: Karafka::Web::Ui::Models::Message
- Inherits:
-
Object
- Object
- Karafka::Web::Ui::Models::Message
- Extended by:
- Lib::Paginations::Paginators
- Defined in:
- lib/karafka/web/ui/models/message.rb
Overview
A proxy between ::Karafka::Messages::Message
and web UI We work with the Karafka messages but use this model to wrap the work needed.
Class Method Summary collapse
-
.find(topic_id, partition_id, offset) ⇒ Object
Looks for a message from a given topic partition.
-
.offset_page(topic_id, partition_id, start_offset, watermark_offsets) ⇒ Array
Fetches requested
page_count
number of Kafka messages starting from the oldest requestedstart_offset
. -
.topic_page(topic_id, partitions_ids, page) ⇒ Object
Fetches requested
page_count
number of Kafka messages from the topic partitions and merges the results.
Class Method Details
.find(topic_id, partition_id, offset) ⇒ Object
Looks for a message from a given topic partition
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/karafka/web/ui/models/message.rb', line 19 def find(topic_id, partition_id, offset) = Lib::Admin.read_topic( topic_id, partition_id, 1, offset ).first return if raise( ::Karafka::Web::Errors::Ui::NotFoundError, [topic_id, partition_id, offset].join(', ') ) end |
.offset_page(topic_id, partition_id, start_offset, watermark_offsets) ⇒ Array
Fetches requested page_count
number of Kafka messages starting from the oldest requested start_offset
. If start_offset
is -1
, will fetch the most recent results
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/karafka/web/ui/models/message.rb', line 45 def offset_page(topic_id, partition_id, start_offset, watermark_offsets) low_offset = watermark_offsets.low high_offset = watermark_offsets.high # If we start from offset -1, it means we want first page with the most recent # results. We obtain this page by using the offset based on the high watermark # off start_offset = high_offset - per_page if start_offset == -1 # No previous pages, no data, and no more offsets no_data_result = [false, [], false] # If there is no data, we return the no results result return no_data_result if low_offset == high_offset if start_offset <= low_offset # If this page does not contain max per page, compute how many messages we can # fetch before stopping count = per_page - (low_offset - start_offset) next_offset = false start_offset = low_offset else next_offset = start_offset - per_page # Do not go below the lowest possible offset next_offset = low_offset if next_offset < low_offset count = high_offset - start_offset # If there would be more messages that we want to get, force max count = per_page if count > per_page end # This code is a bit tricky. Since topics can be compacted and certain offsets may # not be present at all, it may happen that we want to read from a non-existing # offset. In case like this we need to catch this error (we do it in `read_topic`) # and we need to move to an offset closer to high offset. This is not fast but it is # an edge case that should not happen often when inspecting real time data. This can # happen more often for heavily compacted topics with short retention but even then # it is ok for 25 elements we usually operate on a single page. count.times do |index| context_offset = start_offset + index # We need to get less if we move up with missing offsets to get exactly # the number we needed context_count = count - index = read_topic( topic_id, partition_id, context_count, context_offset, # We do not reset the offset here because we are not interested in seeking from # any offset. We are interested in the indication, that there is no offset of a # given value so we can try with a more recent one 'auto.offset.reset': 'error' ) next unless previous_offset = start_offset + count if previous_offset >= high_offset previous_offset = false elsif previous_offset + (per_page - 1) > high_offset previous_offset = high_offset - per_page else previous_offset end return [ previous_offset, fill_compacted(, partition_id, context_offset, context_count, high_offset).reverse, next_offset ] end no_data_result end |
.topic_page(topic_id, partitions_ids, page) ⇒ Object
Fetches requested page_count
number of Kafka messages from the topic partitions and merges the results. Ensures, that pagination works as expected.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/karafka/web/ui/models/message.rb', line 129 def topic_page(topic_id, partitions_ids, page) # This is the bottleneck, for each partition we make one request :( offsets = partitions_ids.map do |partition_id| [partition_id, Models::WatermarkOffsets.find(topic_id, partition_id)] end.to_h # Count number of elements we have in each partition # This assumes linear presence until low. If not, gaps will be filled like we fill # for per partition view counts = offsets.values.map { |offset| offset[:high] - offset[:low] } # Establish initial offsets for the iterator (where to start) per partition # We do not use the negative lookup iterator because we already can compute starting # offsets. This saves a lot of calls to Kafka ranges = Sets.call(counts, page).map do |partition_position, partition_range| partition_id = partitions_ids.to_a[partition_position] watermarks = offsets[partition_id] lowest = watermarks[:high] - partition_range.last - 1 # We -1 because high watermark offset is the next incoming offset and not the last # one in the topic partition highest = watermarks[:high] - partition_range.first - 1 # This range represents offsets we want to fetch [partition_id, lowest..highest] end.to_h # We start on our topic from the lowest offset for each expected partition iterator = Karafka::Pro::Iterator.new( { topic_id => ranges.transform_values(&:first) } ) # Build the aggregated representation for each partition messages, so we can start # with assumption that all the topics are fully compacted. Then we can nicely replace # compacted `false` data with real messages, effectively ensuring that the gaps are # filled with `false` out-of-the-box aggregated = Hash.new { |h, k| h[k] = {} } # We initialize the hash so we have a constant ascending order based on the partition # number partitions_ids.each { |i| aggregated[i] } # We prefill all the potential offsets for each partition, so in case they were # compacted, we get a continuous flow ranges.each do |partition, range| partition_aggr = aggregated[partition] range.each { |i| partition_aggr[i] = [partition, i] } end # Iterate over all partitions and collect data iterator.each do || range = ranges[.partition] # Do not fetch more data from a partition for which we got last message from the # expected offsets # When all partitions are stopped, we will stop operations. This drastically # improves performance because we no longer have to poll nils iterator.stop_current_partition if .offset >= range.last partition = aggregated[.partition] partition[.offset] = end [ aggregated.values.map(&:values).map(&:reverse).reduce(:+), !Sets.call(counts, page + 1).empty? ] end |