Module: Karafka::Routing::Router
- Defined in:
- lib/karafka/routing/router.rb
Overview
Since Kafka does not provide namespaces or modules for topics, they all have “flat” structure so all the routes are being stored in a single level array
Karafka framework Router for routing incoming messages to proper consumers
Class Method Summary collapse
-
.find_by(lookup) ⇒ Karafka::Routing::Topic?
Finds first reference of a given topic based on provided lookup attribute.
-
.find_or_initialize_by_name(name) ⇒ Karafka::Routing::Topic
Finds the topic by name (in any consumer group) and if not present, will built a new representation of the topic with the defaults and default deserializers.
Class Method Details
.find_by(lookup) ⇒ Karafka::Routing::Topic?
Finds first reference of a given topic based on provided lookup attribute
13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/karafka/routing/router.rb', line 13 def find_by(lookup) App.consumer_groups.each do |consumer_group| consumer_group.topics.each do |topic| return topic if lookup.all? do |attribute, value| topic.public_send(attribute) == value end end end nil end |
.find_or_initialize_by_name(name) ⇒ Karafka::Routing::Topic
Please note, that in case of a new topic, it will have a newly built consumer group as well, that is not part of the routing.
Finds the topic by name (in any consumer group) and if not present, will built a new representation of the topic with the defaults and default deserializers.
This is used in places where we may operate on topics that are not part of the routing but we want to do something on them (display data, iterate over, etc)
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/karafka/routing/router.rb', line 35 def find_or_initialize_by_name(name) existing_topic = find_by(name: name) return existing_topic if existing_topic virtual_topic = Topic.new(name, ConsumerGroup.new(name)) Karafka::Routing::Proxy.new( virtual_topic, Karafka::App.config.internal.routing.builder.defaults ).target end |