Class: Karafka::Web::Pro::Ui::Lib::Search::Runner

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/web/pro/ui/lib/search/runner.rb

Overview

Note:

When running a search from latest, we stop when message timestamp is higher then the time when lookup started. This prevents us from iterating on topics for an extended time period when there are less messages than the requested amount but new are coming in real time. It acts as a virtual “eof”.

Search runner that selects proper matcher, sets the parameters and runs the search We use the Pro Iterator for searching but this runner adds metadata tracking and some other metrics that are useful in the Web UI

Instance Method Summary collapse

Constructor Details

#initialize(topic, partitions_count, search_criteria) ⇒ Runner

Returns a new instance of Runner.

Parameters:

  • topic (String)

    topic in which we want to search

  • partitions_count (Integer)

    how many partitions this topic has

  • search_criteria (Hash)

    normalized search criteria



57
58
59
60
61
62
63
64
65
66
# File 'lib/karafka/web/pro/ui/lib/search/runner.rb', line 57

def initialize(topic, partitions_count, search_criteria)
  @topic = topic
  @partitions_count = partitions_count
  @search_criteria = search_criteria
  @partitions_stats = Hash.new { |h, k| h[k] = METRICS_BASE.dup }
  @totals_stats = METRICS_BASE.dup
  @timeout = Web.config.ui.search.timeout
  @stop_reason = nil
  @matched = []
end

Instance Method Details

#callArray<Array<Karafka::Messages::Message>, Hash>

Note:

Results are sorted based on the time value.

Runs the search, collects search statistics and returns the results

Returns:

  • (Array<Array<Karafka::Messages::Message>, Hash>)

    array with search results and metadata



72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/karafka/web/pro/ui/lib/search/runner.rb', line 72

def call
  search_with_stats

  [
    # We return most recent results on top
    @matched.sort_by(&:timestamp).reverse,
    {
      totals: @totals_stats,
      partitions: @partitions_stats,
      stop_reason: @stop_reason
    }.freeze
  ]
end