Class: Rdkafka::Producer::PartitionsCountCache
- Inherits:
-
Object
- Object
- Rdkafka::Producer::PartitionsCountCache
- Includes:
- Helpers::Time
- Defined in:
- lib/rdkafka/producer/partitions_count_cache.rb
Overview
Design considerations:
Caching mechanism for Kafka topic partition counts to avoid frequent cluster queries
This cache is designed to optimize the process of obtaining partition counts for topics. It uses several strategies to minimize Kafka cluster queries:
-
Statistics-based updates When statistics callbacks are enabled (via
statistics.interval.ms
), we leverage this data to proactively update the partition counts cache. This approach costs approximately 0.02ms of processing time during each statistics interval (typically every 5 seconds) but eliminates the need for explicit blocking metadata queries. -
Edge case handling If a user configures
statistics.interval.ms
much higher than the default cache TTL (30 seconds), the cache will still function correctly. When statistics updates don't occur frequently enough, the cache entries will expire naturally, triggering a blocking refresh when needed. -
User configuration awareness The cache respects user-defined settings. If
topic.metadata.refresh.interval.ms
is set very high, the responsibility for potentially stale data falls on the user. This is an explicit design choice to honor user configuration preferences and align with librdkafka settings. -
Process-wide efficiency Since this cache is shared across all Rdkafka producers and consumers within a process, having multiple clients improves overall efficiency. Each client contributes to keeping the cache updated, benefiting all other clients.
-
Thread-safety approach The implementation uses fine-grained locking with per-topic mutexes to minimize contention in multi-threaded environments while ensuring data consistency.
-
Topic recreation handling If a topic is deleted and recreated with fewer partitions, the cache will continue to report the higher count until either the TTL expires or the process is restarted. This design choice simplifies the implementation while relying on librdkafka's error handling for edge cases. In production environments, topic recreation with different partition counts is typically accompanied by application restarts to handle structural changes. This also aligns with the previous cache implementation.
Constant Summary collapse
- DEFAULT_TTL =
Note:
This default was chosen to balance freshness of metadata with performance optimization. Most Kafka cluster topology changes are planned operations, making 30 seconds a reasonable compromise.
Default time-to-live for cached partition counts in seconds
30
Instance Method Summary collapse
-
#get(topic) { ... } ⇒ Integer
Reads partition count for a topic with automatic refresh when expired.
-
#initialize(ttl = DEFAULT_TTL) ⇒ PartitionsCountCache
constructor
Creates a new partition count cache.
-
#set(topic, new_count) ⇒ Object
Update partition count for a topic when needed.
-
#to_h ⇒ Hash
Hash with ttls and partitions counts array.
Methods included from Helpers::Time
Constructor Details
#initialize(ttl = DEFAULT_TTL) ⇒ PartitionsCountCache
Creates a new partition count cache
59 60 61 62 63 64 65 |
# File 'lib/rdkafka/producer/partitions_count_cache.rb', line 59 def initialize(ttl = DEFAULT_TTL) @counts = {} @mutex_hash = {} # Used only for @mutex_hash access to ensure thread-safety when creating new mutexes @mutex_for_hash = Mutex.new @ttl = ttl end |
Instance Method Details
#get(topic) { ... } ⇒ Integer
The implementation prioritizes read performance over write consistency since partition counts typically only increase during normal operation.
Reads partition count for a topic with automatic refresh when expired
This method will return the cached partition count if available and not expired. If the value is expired or not available, it will execute the provided block to fetch the current value from Kafka.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/rdkafka/producer/partitions_count_cache.rb', line 80 def get(topic) current_info = @counts[topic] if current_info.nil? || expired?(current_info[0]) new_count = yield if current_info.nil? # No existing data, create a new entry with mutex set(topic, new_count) return new_count else current_count = current_info[1] if new_count > current_count # Higher value needs mutex to update both timestamp and count set(topic, new_count) return new_count else # Same or lower value, just update timestamp without mutex (topic) return current_count end end end current_info[1] end |
#set(topic, new_count) ⇒ Object
We prioritize higher partition counts and only accept them when using a mutex to ensure consistency. This design decision is based on the fact that partition counts in Kafka only increase during normal operation.
Update partition count for a topic when needed
This method updates the partition count for a topic in the cache. It uses a mutex to ensure thread-safety during updates.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/rdkafka/producer/partitions_count_cache.rb', line 122 def set(topic, new_count) # First check outside mutex to avoid unnecessary locking current_info = @counts[topic] # For lower values, we don't update count but might need to refresh timestamp if current_info && new_count < current_info[1] (topic) return end # Only lock the specific topic mutex mutex_for(topic).synchronize do # Check again inside the lock as another thread might have updated current_info = @counts[topic] if current_info.nil? # Create new entry @counts[topic] = [monotonic_now, new_count] else current_count = current_info[1] if new_count > current_count # Update to higher count value current_info[0] = monotonic_now current_info[1] = new_count else # Same or lower count, update timestamp only current_info[0] = monotonic_now end end end end |
#to_h ⇒ Hash
Returns hash with ttls and partitions counts array.
157 158 159 |
# File 'lib/rdkafka/producer/partitions_count_cache.rb', line 157 def to_h @counts end |