Class: Karafka::Routing::SubscriptionGroup
- Inherits:
- 
      Object
      
        - Object
- Karafka::Routing::SubscriptionGroup
 
- Defined in:
- lib/karafka/routing/subscription_group.rb
Overview
One subscription group will always belong to one consumer group, but one consumer group can have multiple subscription groups.
Object representing a set of single consumer group topics that can be subscribed together with one connection.
Instance Attribute Summary collapse
- 
  
    
      #consumer_group  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute consumer_group. 
- 
  
    
      #id  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute id. 
- 
  
    
      #kafka  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute kafka. 
- 
  
    
      #name  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute name. 
- 
  
    
      #topics  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute topics. 
Class Method Summary collapse
- 
  
    
      .id  ⇒ String 
    
    
  
  
  
  
  
  
  
  
  
    Generates new subscription group id that will be used in case of anonymous subscription groups. 
Instance Method Summary collapse
- 
  
    
      #active?  ⇒ Boolean 
    
    
  
  
  
  
  
  
  
  
  
    Is this subscription group one of active once. 
- 
  
    
      #assignments(_consumer)  ⇒ false, Rdkafka::Consumer::TopicPartitionList 
    
    
  
  
  
  
  
  
  
  
  
    List of tpls for direct assignments or false for the normal mode. 
- 
  
    
      #consumer_group_id  ⇒ String 
    
    
  
  
  
  
  
  
  
  
  
    Consumer group id. 
- 
  
    
      #initialize(position, topics)  ⇒ SubscriptionGroup 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    Built subscription group. 
- 
  
    
      #max_messages  ⇒ Integer 
    
    
  
  
  
  
  
  
  
  
  
    Max messages fetched in a single go. 
- 
  
    
      #max_wait_time  ⇒ Integer 
    
    
  
  
  
  
  
  
  
  
  
    Max milliseconds we can wait for incoming messages. 
- 
  
    
      #refresh  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Refreshes the configuration of this subscription group if needed based on the execution context. 
- 
  
    
      #subscriptions  ⇒ false, Array<String> 
    
    
  
  
  
  
  
  
  
  
  
    Names of topics to which we should subscribe or false when operating only on direct assignments. 
- 
  
    
      #to_s  ⇒ String 
    
    
  
  
  
  
  
  
  
  
  
    Id of the subscription group. 
Constructor Details
#initialize(position, topics) ⇒ SubscriptionGroup
Returns built subscription group.
| 45 46 47 48 49 50 51 52 53 54 55 56 57 | # File 'lib/karafka/routing/subscription_group.rb', line 45 def initialize(position, topics) @details = topics.first.subscription_group_details @name = @details.fetch(:name) @consumer_group = topics.first.consumer_group # We include the consumer group id here because we want to have unique ids of subscription # groups across the system. Otherwise user could set the same name for multiple # subscription groups in many consumer groups effectively having same id for different # entities @id = "#{@consumer_group.id}_#{@name}_#{position}" @position = position @topics = topics @kafka = build_kafka end | 
Instance Attribute Details
#consumer_group ⇒ Object (readonly)
Returns the value of attribute consumer_group.
| 17 18 19 | # File 'lib/karafka/routing/subscription_group.rb', line 17 def consumer_group @consumer_group end | 
#id ⇒ Object (readonly)
Returns the value of attribute id.
| 17 18 19 | # File 'lib/karafka/routing/subscription_group.rb', line 17 def id @id end | 
#kafka ⇒ Object (readonly)
Returns the value of attribute kafka.
| 17 18 19 | # File 'lib/karafka/routing/subscription_group.rb', line 17 def kafka @kafka end | 
#name ⇒ Object (readonly)
Returns the value of attribute name.
| 17 18 19 | # File 'lib/karafka/routing/subscription_group.rb', line 17 def name @name end | 
#topics ⇒ Object (readonly)
Returns the value of attribute topics.
| 17 18 19 | # File 'lib/karafka/routing/subscription_group.rb', line 17 def topics @topics end | 
Class Method Details
.id ⇒ String
Generates new subscription group id that will be used in case of anonymous subscription groups
| 28 29 30 31 32 33 34 35 36 37 | # File 'lib/karafka/routing/subscription_group.rb', line 28 def id ID_MUTEX.synchronize do @group_counter ||= 0 @group_counter += 1 ::Digest::SHA256.hexdigest( @group_counter.to_s )[0..11] end end | 
Instance Method Details
#active? ⇒ Boolean
Returns is this subscription group one of active once.
| 75 76 77 | # File 'lib/karafka/routing/subscription_group.rb', line 75 def active? activity_manager.active?(:subscription_groups, name) end | 
#assignments(_consumer) ⇒ false, Rdkafka::Consumer::TopicPartitionList
Returns List of tpls for direct assignments or false for the normal mode.
| 92 93 94 | # File 'lib/karafka/routing/subscription_group.rb', line 92 def assignments(_consumer) false end | 
#consumer_group_id ⇒ String
Returns consumer group id.
| 60 61 62 | # File 'lib/karafka/routing/subscription_group.rb', line 60 def consumer_group_id kafka[:'group.id'] end | 
#max_messages ⇒ Integer
Returns max messages fetched in a single go.
| 65 66 67 | # File 'lib/karafka/routing/subscription_group.rb', line 65 def @topics.first. end | 
#max_wait_time ⇒ Integer
Returns max milliseconds we can wait for incoming messages.
| 70 71 72 | # File 'lib/karafka/routing/subscription_group.rb', line 70 def max_wait_time @topics.first.max_wait_time end | 
#refresh ⇒ Object
Refreshes the configuration of this subscription group if needed based on the execution context.
Since the initial routing setup happens in the supervisor, it is inherited by the children.
This causes incomplete assignment of group.instance.id which is not expanded with proper
node identifier. This refreshes this if needed when in swarm.
| 108 109 110 111 112 113 | # File 'lib/karafka/routing/subscription_group.rb', line 108 def refresh return unless node return unless kafka.key?(:'group.instance.id') @kafka = build_kafka end | 
#subscriptions ⇒ false, Array<String>
Most of the time it should not include inactive topics but in case of pattern matching the matcher topics become inactive down the road, hence we filter out so they are later removed.
Returns names of topics to which we should subscribe or false when operating only on direct assignments.
| 85 86 87 | # File 'lib/karafka/routing/subscription_group.rb', line 85 def subscriptions topics.select(&:active?).map(&:subscription_name) end | 
#to_s ⇒ String
This is an alias for displaying in places where we print the stringified version.
Returns id of the subscription group.
| 98 99 100 | # File 'lib/karafka/routing/subscription_group.rb', line 98 def to_s id end |