Class: Karafka::Instrumentation::Callbacks::Rebalance
- Inherits:
-
Object
- Object
- Karafka::Instrumentation::Callbacks::Rebalance
- Defined in:
- lib/karafka/instrumentation/callbacks/rebalance.rb
Overview
Callback that connects to the librdkafka rebalance callback and converts those events into our internal events
Instance Method Summary collapse
-
#initialize(subscription_group, client_id) ⇒ Rebalance
constructor
A new instance of Rebalance.
-
#on_partitions_assign(tpl) ⇒ Object
Publishes an event that partitions are going to be assigned.
-
#on_partitions_assigned(tpl) ⇒ Object
Publishes an event that partitions were assigned.
-
#on_partitions_revoke(tpl) ⇒ Object
Publishes an event that partitions are going to be revoked.
-
#on_partitions_revoked(tpl) ⇒ Object
Publishes an event that partitions were revoked.
Constructor Details
#initialize(subscription_group, client_id) ⇒ Rebalance
Returns a new instance of Rebalance.
16 17 18 19 |
# File 'lib/karafka/instrumentation/callbacks/rebalance.rb', line 16 def initialize(subscription_group, client_id) @subscription_group = subscription_group @client_id = client_id end |
Instance Method Details
#on_partitions_assign(tpl) ⇒ Object
Publishes an event that partitions are going to be assigned
32 33 34 |
# File 'lib/karafka/instrumentation/callbacks/rebalance.rb', line 32 def on_partitions_assign(tpl) instrument('partitions_assign', tpl) end |
#on_partitions_assigned(tpl) ⇒ Object
Publishes an event that partitions were assigned.
47 48 49 |
# File 'lib/karafka/instrumentation/callbacks/rebalance.rb', line 47 def on_partitions_assigned(tpl) instrument('partitions_assigned', tpl) end |
#on_partitions_revoke(tpl) ⇒ Object
Publishes an event that partitions are going to be revoked. At this stage we can still commit offsets, etc.
25 26 27 |
# File 'lib/karafka/instrumentation/callbacks/rebalance.rb', line 25 def on_partitions_revoke(tpl) instrument('partitions_revoke', tpl) end |
#on_partitions_revoked(tpl) ⇒ Object
Publishes an event that partitions were revoked. This is after we’ve lost them, so no option to commit offsets.
40 41 42 |
# File 'lib/karafka/instrumentation/callbacks/rebalance.rb', line 40 def on_partitions_revoked(tpl) instrument('partitions_revoked', tpl) end |