Class: Karafka::Pro::Processing::OffsetMetadata::Listener
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::OffsetMetadata::Listener
- Defined in:
- lib/karafka/pro/processing/offset_metadata/listener.rb
Overview
Keeps track of rebalances and updates the fetcher Since we cache the tpls with metadata, we need to invalidate them on events that would cause changes in the assignments
Instance Method Summary collapse
-
#on_connection_listener_before_fetch_loop(event) ⇒ Object
When we start listening we need to register this client in the metadata fetcher, so we have the client related to a given subscription group that we can use in fetcher since fetcher may be used in filtering API and other places outside of the standard consumer flow.
-
#on_rebalance_partitions_assigned(event) ⇒ Object
Invalidates internal cache when assignments change so we can get correct metadata.
-
#on_rebalance_partitions_revoked(event) ⇒ Object
Invalidates internal cache when assignments change so we can get correct metadata.
Instance Method Details
#on_connection_listener_before_fetch_loop(event) ⇒ Object
When we start listening we need to register this client in the metadata fetcher, so we have the client related to a given subscription group that we can use in fetcher since fetcher may be used in filtering API and other places outside of the standard consumer flow
27 28 29 |
# File 'lib/karafka/pro/processing/offset_metadata/listener.rb', line 27 def on_connection_listener_before_fetch_loop(event) Fetcher.register event[:client] end |
#on_rebalance_partitions_assigned(event) ⇒ Object
Invalidates internal cache when assignments change so we can get correct metadata
33 34 35 |
# File 'lib/karafka/pro/processing/offset_metadata/listener.rb', line 33 def on_rebalance_partitions_assigned(event) Fetcher.clear event[:subscription_group] end |
#on_rebalance_partitions_revoked(event) ⇒ Object
Invalidates internal cache when assignments change so we can get correct metadata
39 40 41 |
# File 'lib/karafka/pro/processing/offset_metadata/listener.rb', line 39 def on_rebalance_partitions_revoked(event) Fetcher.clear event[:subscription_group] end |