Class: Karafka::Web::Pro::Ui::Lib::Search::Runner

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/web/pro/ui/lib/search/runner.rb

Overview

Note:

When running a search from latest, we stop when message timestamp is higher then the time when lookup started. This prevents us from iterating on topics for an extended time period when there are less messages than the requested amount but new are coming in real time. It acts as a virtual “eof”.

Search runner that selects proper matcher, sets the parameters and runs the search We use the Pro Iterator for searching but this runner adds metadata tracking and some other metrics that are useful in the Web UI

Instance Method Summary collapse

Constructor Details

#initialize(topic, partitions_count, search_criteria) ⇒ Runner

Returns a new instance of Runner.

Parameters:

  • topic (String)

    topic in which we want to search

  • partitions_count (Integer)

    how many partitions this topic has

  • search_criteria (Hash)

    normalized search criteria



49
50
51
52
53
54
55
56
57
58
# File 'lib/karafka/web/pro/ui/lib/search/runner.rb', line 49

def initialize(topic, partitions_count, search_criteria)
  @topic = topic
  @partitions_count = partitions_count
  @search_criteria = search_criteria
  @partitions_stats = Hash.new { |h, k| h[k] = METRICS_BASE.dup }
  @totals_stats = METRICS_BASE.dup
  @timeout = Web.config.ui.search.timeout
  @stop_reason = nil
  @matched = []
end

Instance Method Details

#callArray<Array<Karafka::Messages::Message>, Hash>

Note:

Results are sorted based on the time value.

Runs the search, collects search statistics and returns the results

Returns:

  • (Array<Array<Karafka::Messages::Message>, Hash>)

    array with search results and metadata



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/karafka/web/pro/ui/lib/search/runner.rb', line 64

def call
  search_with_stats

  [
    # We return most recent results on top
    @matched.sort_by(&:timestamp).reverse,
    {
      totals: @totals_stats,
      partitions: @partitions_stats,
      stop_reason: @stop_reason
    }.freeze
  ]
end