Class: Karafka::Connection::ConsumerGroupCoordinator
- Inherits:
-
Object
- Object
- Karafka::Connection::ConsumerGroupCoordinator
- Defined in:
- lib/karafka/connection/consumer_group_coordinator.rb
Overview
This object represents a collective status of execution of group of listeners running inside of one consumer group but in separate subscription groups.
There are cases when we do not want to close a given client when others from the same consumer group are running because it can cause instabilities due to early shutdown of some of the clients out of same consumer group.
We also want to make sure, we close one consumer at a time while others can continue polling.
This prevents a scenario, where a rebalance is not acknowledged and we loose assignment without having a chance to commit changes.
Instance Method Summary collapse
-
#finish_work(listener_id) ⇒ Object
Marks given listener as finished.
-
#finished? ⇒ Boolean
True if all the subscription groups from a given consumer group are finished.
-
#initialize(group_size) ⇒ ConsumerGroupCoordinator
constructor
A new instance of ConsumerGroupCoordinator.
-
#shutdown? ⇒ Boolean
Can we start shutdown on a given listener.
-
#unlock ⇒ Object
Unlocks the shutdown lock.
Constructor Details
#initialize(group_size) ⇒ ConsumerGroupCoordinator
Returns a new instance of ConsumerGroupCoordinator.
18 19 20 21 22 |
# File 'lib/karafka/connection/consumer_group_coordinator.rb', line 18 def initialize(group_size) @shutdown_mutex = Mutex.new @group_size = group_size @finished = Set.new end |
Instance Method Details
#finish_work(listener_id) ⇒ Object
Marks given listener as finished
43 44 45 |
# File 'lib/karafka/connection/consumer_group_coordinator.rb', line 43 def finish_work(listener_id) @finished << listener_id end |
#finished? ⇒ Boolean
Returns true if all the subscription groups from a given consumer group are finished.
26 27 28 |
# File 'lib/karafka/connection/consumer_group_coordinator.rb', line 26 def finished? @finished.size == @group_size end |
#shutdown? ⇒ Boolean
If true, will also obtain a lock so no-one else will be closing the same time we do
Returns can we start shutdown on a given listener.
32 33 34 |
# File 'lib/karafka/connection/consumer_group_coordinator.rb', line 32 def shutdown? finished? && @shutdown_mutex.try_lock end |
#unlock ⇒ Object
Unlocks the shutdown lock
37 38 39 |
# File 'lib/karafka/connection/consumer_group_coordinator.rb', line 37 def unlock @shutdown_mutex.unlock if @shutdown_mutex.owned? end |