Class: Karafka::Pro::Processing::OffsetMetadata::Listener

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/processing/offset_metadata/listener.rb

Overview

Keeps track of rebalances and updates the fetcher Since we cache the tpls with metadata, we need to invalidate them on events that would cause changes in the assignments

Instance Method Summary collapse

Instance Method Details

#on_connection_listener_before_fetch_loop(event) ⇒ Object

When we start listening we need to register this client in the metadata fetcher, so we have the client related to a given subscription group that we can use in fetcher since fetcher may be used in filtering API and other places outside of the standard consumer flow

Parameters:

  • event (Karafka::Core::Monitoring::Event)


19
20
21
# File 'lib/karafka/pro/processing/offset_metadata/listener.rb', line 19

def on_connection_listener_before_fetch_loop(event)
  Fetcher.register event[:client]
end

#on_rebalance_partitions_assigned(event) ⇒ Object

Invalidates internal cache when assignments change so we can get correct metadata

Parameters:

  • event (Karafka::Core::Monitoring::Event)


25
26
27
# File 'lib/karafka/pro/processing/offset_metadata/listener.rb', line 25

def on_rebalance_partitions_assigned(event)
  Fetcher.clear event[:subscription_group]
end

#on_rebalance_partitions_revoked(event) ⇒ Object

Invalidates internal cache when assignments change so we can get correct metadata

Parameters:

  • event (Karafka::Core::Monitoring::Event)


31
32
33
# File 'lib/karafka/pro/processing/offset_metadata/listener.rb', line 31

def on_rebalance_partitions_revoked(event)
  Fetcher.clear event[:subscription_group]
end