Class: Karafka::Routing::ConsumerGroup
- Inherits:
- 
      Object
      
        - Object
- Karafka::Routing::ConsumerGroup
 
- Defined in:
- lib/karafka/routing/consumer_group.rb
Overview
A single consumer group represents Kafka consumer group, but it may not match 1:1 with subscription groups. There can be more subscription groups than consumer groups
Object used to describe a single consumer group that is going to subscribe to given topics It is a part of Karafka’s DSL
Instance Attribute Summary collapse
- 
  
    
      #current_subscription_group_details  ⇒ Object 
    
    
  
  
  
  
    
    
  
  
  
  
  
  
    This is a “virtual” attribute that is not building subscription groups. 
- 
  
    
      #id  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute id. 
- 
  
    
      #name  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute name. 
- 
  
    
      #topics  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute topics. 
Instance Method Summary collapse
- 
  
    
      #active?  ⇒ Boolean 
    
    
  
  
  
  
  
  
  
  
  
    True if this consumer group should be active in our current process. 
- 
  
    
      #initialize(name)  ⇒ ConsumerGroup 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of ConsumerGroup. 
- 
  
    
      #subscription_group=(name = SubscriptionGroup.id, &block)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Assigns the current subscription group id based on the defined one and allows for further topic definition. 
- 
  
    
      #subscription_groups  ⇒ Array<Routing::SubscriptionGroup> 
    
    
  
  
  
  
  
  
  
  
  
    All the subscription groups build based on the consumer group topics. 
- 
  
    
      #to_h  ⇒ Hash 
    
    
  
  
  
  
  
  
  
  
  
    Hashed version of consumer group that can be used for validation purposes topics inside of it. 
- 
  
    
      #topic=(name, &block)  ⇒ Karafka::Routing::Topic 
    
    
  
  
  
  
  
  
  
  
  
    Builds a topic representation inside of a current consumer group route. 
Constructor Details
#initialize(name) ⇒ ConsumerGroup
Returns a new instance of ConsumerGroup.
| 29 30 31 32 33 34 35 36 37 | # File 'lib/karafka/routing/consumer_group.rb', line 29 def initialize(name) @name = name.to_s # This used to be different when consumer mappers existed but now it is the same @id = @name @topics = Topics.new([]) # Initialize the subscription group so there's always a value for it, since even if not # defined directly, a subscription group will be created @current_subscription_group_details = { name: SubscriptionGroup.id } end | 
Instance Attribute Details
#current_subscription_group_details ⇒ Object
This is a “virtual” attribute that is not building subscription groups. It allows us to store the “current” subscription group defined in the routing This subscription group id is then injected into topics, so we can compute the subscription groups
| 23 24 25 | # File 'lib/karafka/routing/consumer_group.rb', line 23 def current_subscription_group_details @current_subscription_group_details end | 
#id ⇒ Object (readonly)
Returns the value of attribute id.
| 17 18 19 | # File 'lib/karafka/routing/consumer_group.rb', line 17 def id @id end | 
#name ⇒ Object (readonly)
Returns the value of attribute name.
| 17 18 19 | # File 'lib/karafka/routing/consumer_group.rb', line 17 def name @name end | 
#topics ⇒ Object (readonly)
Returns the value of attribute topics.
| 17 18 19 | # File 'lib/karafka/routing/consumer_group.rb', line 17 def topics @topics end | 
Instance Method Details
#active? ⇒ Boolean
Returns true if this consumer group should be active in our current process.
| 40 41 42 | # File 'lib/karafka/routing/consumer_group.rb', line 40 def active? activity_manager.active?(:consumer_groups, name) end | 
#subscription_group=(name = SubscriptionGroup.id, &block) ⇒ Object
Assigns the current subscription group id based on the defined one and allows for further topic definition
| 66 67 68 69 70 71 72 73 74 75 76 | # File 'lib/karafka/routing/consumer_group.rb', line 66 def subscription_group=(name = SubscriptionGroup.id, &block) # We cast it here, so the routing supports symbol based but that's anyhow later on # validated as a string @current_subscription_group_details = { name: name.to_s } Proxy.new(self, &block) # We need to reset the current subscription group after it is used, so it won't leak # outside to other topics that would be defined without a defined subscription group @current_subscription_group_details = { name: SubscriptionGroup.id } end | 
#subscription_groups ⇒ Array<Routing::SubscriptionGroup>
Returns all the subscription groups build based on the consumer group topics.
| 80 81 82 | # File 'lib/karafka/routing/consumer_group.rb', line 80 def subscription_groups @subscription_groups ||= subscription_groups_builder.call(topics) end | 
#to_h ⇒ Hash
Hashed version of consumer group that can be used for validation purposes topics inside of it.
| 87 88 89 90 91 92 | # File 'lib/karafka/routing/consumer_group.rb', line 87 def to_h { topics: topics.map(&:to_h), id: id }.freeze end | 
#topic=(name, &block) ⇒ Karafka::Routing::Topic
Builds a topic representation inside of a current consumer group route
| 48 49 50 51 52 53 54 55 56 57 58 59 60 | # File 'lib/karafka/routing/consumer_group.rb', line 48 def topic=(name, &block) topic = Topic.new(name, self) @topics << Proxy.new( topic, builder.defaults, &block ).target built_topic = @topics.last # We overwrite it conditionally in case it was not set by the user inline in the topic # block definition built_topic.subscription_group_details ||= current_subscription_group_details built_topic end |