Class: Karafka::Pro::Routing::Features::Patterns::Detector

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/routing/features/patterns/detector.rb

Overview

Note:

This is NOT thread-safe and should run in a thread-safe context that warranties that there won’t be any race conditions

Detects if a given topic matches any of the patterns and if so, injects it into the given subscription group routing

Instance Method Summary collapse

Instance Method Details

#expand(sg_topics, new_topic) ⇒ Object

Checks if the provided topic matches any of the patterns and when detected, expands the routing with it.

Parameters:

  • sg_topics (Array<Karafka::Routing::Topic>)

    given subscription group routing topics.

  • new_topic (String)

    new topic that we have detected



29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/karafka/pro/routing/features/patterns/detector.rb', line 29

def expand(sg_topics, new_topic)
  MUTEX.synchronize do
    sg_topics
      .map(&:patterns)
      .select(&:active?)
      .select(&:matcher?)
      .map(&:pattern)
      .then { |pts| pts.empty? ? return : pts }
      .then { |pts| Patterns.new(pts) }
      .find(new_topic)
      .then { |pattern| pattern || return }
      .then { |pattern| install(pattern, new_topic, sg_topics) }
  end
end