Class: Karafka::Pro::Processing::OffsetMetadata::Fetcher

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Singleton
Defined in:
lib/karafka/pro/processing/offset_metadata/fetcher.rb

Overview

This fetcher is responsible for fetching and caching committed offsets metadata information.

By design we fetch all information for a requested topic assignments. Not all topics from the same subscription group may need metadata and even if, we can run the few smaller queries. This approach prevents us from querying all assigned topics data in one go preventing excessive queries.

Since the assumption is, that user will not have to reach out for the later metadata since it is produced in the context of a given consumer assignment, we can cache the initial result and only allow users for explicit invalidation.

Instance Method Summary collapse

Constructor Details

#initializeFetcher

Returns a new instance of Fetcher.



31
32
33
34
35
# File 'lib/karafka/pro/processing/offset_metadata/fetcher.rb', line 31

def initialize
  @mutexes = {}
  @clients = {}
  @tpls = {}
end

Instance Method Details

#clear(subscription_group) ⇒ Object

Clears cache of a given subscription group. It is triggered on assignment changes.

Parameters:



79
80
81
82
83
# File 'lib/karafka/pro/processing/offset_metadata/fetcher.rb', line 79

def clear(subscription_group)
  @mutexes.fetch(subscription_group).synchronize do
    @tpls[subscription_group].clear
  end
end

#find(topic, partition, cache: true) ⇒ Object, false

Queries or retrieves from cache the given offset metadata for the selected partition

Parameters:

  • topic (Karafka::Routing::Topic)

    routing topic with subscription group reference

  • partition (Integer)

    partition for which we want to get stored offset metadata

  • cache (Boolean) (defaults to: true)

    forces explicit query to Kafka when false and cache refresh. By default we use the setting from the topic level but this can be overwritten on a per request basis if needed.

Returns:

  • (Object, false)

    deserialized metadata (string deserializer by default) or false in case we were not able to obtain the details because we have lost the assignment



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/karafka/pro/processing/offset_metadata/fetcher.rb', line 58

def find(topic, partition, cache: true)
  cache = topic..cache? && cache

  tpls = fetch(topic, cache)

  return false unless tpls

  t_partitions = tpls.fetch(topic.name, [])
  t_partition = t_partitions.find { |t_p| t_p.partition == partition }

  # If we do not have given topic partition here, it means it is no longer part of our
  # assignment and we should return false
  return false unless t_partition

  topic..deserializer.call(t_partition.)
end

#register(client) ⇒ Object

Note:

Since we store the client reference and not the underlying rdkafka consumer instance, we do not have to deal with the recovery as it is abstracted away

Registers a client of a given subscription group, so we can use it for queries later on

Parameters:



41
42
43
44
45
46
# File 'lib/karafka/pro/processing/offset_metadata/fetcher.rb', line 41

def register(client)
  @clients[client.subscription_group] = client
  # We use one mutex per SG because independent SGs can query in parallel
  @mutexes[client.subscription_group] = Mutex.new
  @tpls[client.subscription_group] = {}
end