Class: Karafka::Pro::Processing::OffsetMetadata::Listener
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::OffsetMetadata::Listener
- Defined in:
- lib/karafka/pro/processing/offset_metadata/listener.rb
Overview
Keeps track of rebalances and updates the fetcher Since we cache the tpls with metadata, we need to invalidate them on events that would cause changes in the assignments
Instance Method Summary collapse
-
#on_connection_listener_before_fetch_loop(event) ⇒ Object
When we start listening we need to register this client in the metadata fetcher, so we have the client related to a given subscription group that we can use in fetcher since fetcher may be used in filtering API and other places outside of the standard consumer flow.
-
#on_rebalance_partitions_assigned(event) ⇒ Object
Invalidates internal cache when assignments change so we can get correct metadata.
-
#on_rebalance_partitions_revoked(event) ⇒ Object
Invalidates internal cache when assignments change so we can get correct metadata.
Instance Method Details
#on_connection_listener_before_fetch_loop(event) ⇒ Object
When we start listening we need to register this client in the metadata fetcher, so we have the client related to a given subscription group that we can use in fetcher since fetcher may be used in filtering API and other places outside of the standard consumer flow
19 20 21 |
# File 'lib/karafka/pro/processing/offset_metadata/listener.rb', line 19 def on_connection_listener_before_fetch_loop(event) Fetcher.register event[:client] end |
#on_rebalance_partitions_assigned(event) ⇒ Object
Invalidates internal cache when assignments change so we can get correct metadata
25 26 27 |
# File 'lib/karafka/pro/processing/offset_metadata/listener.rb', line 25 def on_rebalance_partitions_assigned(event) Fetcher.clear event[:subscription_group] end |
#on_rebalance_partitions_revoked(event) ⇒ Object
Invalidates internal cache when assignments change so we can get correct metadata
31 32 33 |
# File 'lib/karafka/pro/processing/offset_metadata/listener.rb', line 31 def on_rebalance_partitions_revoked(event) Fetcher.clear event[:subscription_group] end |