Module: Karafka::Pro::Routing::Features::DirectAssignments::SubscriptionGroup
- Defined in:
- lib/karafka/pro/routing/features/direct_assignments/subscription_group.rb
Overview
Extension allowing us to select correct subscriptions and assignments based on the expanded routing setup
Instance Method Summary collapse
-
#assignments(consumer) ⇒ Rdkafka::Consumer::TopicPartitionList
Final tpl for assignments.
-
#subscriptions ⇒ false, Array<String>
False if we do not have any subscriptions or array with all the subscriptions for given subscription group.
Instance Method Details
#assignments(consumer) ⇒ Rdkafka::Consumer::TopicPartitionList
Returns final tpl for assignments.
33 34 35 36 37 38 39 40 41 |
# File 'lib/karafka/pro/routing/features/direct_assignments/subscription_group.rb', line 33 def assignments(consumer) topics .select(&:active?) .select { |topic| topic.direct_assignments.active? } .to_h { |topic| build_assignments(topic) } .tap { |topics| return false if topics.empty? } .then { |topics| Iterator::Expander.new.call(topics) } .then { |topics| Iterator::TplBuilder.new(consumer, topics).call } end |
#subscriptions ⇒ false, Array<String>
Returns false if we do not have any subscriptions or array with all the subscriptions for given subscription group.
22 23 24 25 26 27 28 |
# File 'lib/karafka/pro/routing/features/direct_assignments/subscription_group.rb', line 22 def subscriptions topics .select(&:active?) .reject { |topic| topic.direct_assignments.active? } .map(&:subscription_name) .then { |subscriptions| subscriptions.empty? ? false : subscriptions } end |