Class: Rdkafka::Producer::PartitionsCountCache

Inherits:
Object
  • Object
show all
Includes:
Helpers::Time
Defined in:
lib/rdkafka/producer/partitions_count_cache.rb

Overview

Note:

Design considerations:

Caching mechanism for Kafka topic partition counts to avoid frequent cluster queries

This cache is designed to optimize the process of obtaining partition counts for topics. It uses several strategies to minimize Kafka cluster queries:

  1. Statistics-based updates When statistics callbacks are enabled (via statistics.interval.ms), we leverage this data to proactively update the partition counts cache. This approach costs approximately 0.02ms of processing time during each statistics interval (typically every 5 seconds) but eliminates the need for explicit blocking metadata queries.

  2. Edge case handling If a user configures statistics.interval.ms much higher than the default cache TTL (30 seconds), the cache will still function correctly. When statistics updates don't occur frequently enough, the cache entries will expire naturally, triggering a blocking refresh when needed.

  3. User configuration awareness The cache respects user-defined settings. If topic.metadata.refresh.interval.ms is set very high, the responsibility for potentially stale data falls on the user. This is an explicit design choice to honor user configuration preferences and align with librdkafka settings.

  4. Process-wide efficiency Since this cache is shared across all Rdkafka producers and consumers within a process, having multiple clients improves overall efficiency. Each client contributes to keeping the cache updated, benefiting all other clients.

  5. Thread-safety approach The implementation uses fine-grained locking with per-topic mutexes to minimize contention in multi-threaded environments while ensuring data consistency.

  6. Topic recreation handling If a topic is deleted and recreated with fewer partitions, the cache will continue to report the higher count until either the TTL expires or the process is restarted. This design choice simplifies the implementation while relying on librdkafka's error handling for edge cases. In production environments, topic recreation with different partition counts is typically accompanied by application restarts to handle structural changes. This also aligns with the previous cache implementation.

Constant Summary collapse

DEFAULT_TTL =
Note:

This default was chosen to balance freshness of metadata with performance optimization. Most Kafka cluster topology changes are planned operations, making 30 seconds a reasonable compromise.

Default time-to-live for cached partition counts in seconds

30

Instance Method Summary collapse

Methods included from Helpers::Time

#monotonic_now

Constructor Details

#initialize(ttl = DEFAULT_TTL) ⇒ PartitionsCountCache

Creates a new partition count cache

Parameters:

  • ttl (Integer) (defaults to: DEFAULT_TTL)

    Time-to-live in seconds for cached values



59
60
61
62
63
64
65
# File 'lib/rdkafka/producer/partitions_count_cache.rb', line 59

def initialize(ttl = DEFAULT_TTL)
  @counts = {}
  @mutex_hash = {}
  # Used only for @mutex_hash access to ensure thread-safety when creating new mutexes
  @mutex_for_hash = Mutex.new
  @ttl = ttl
end

Instance Method Details

#get(topic) { ... } ⇒ Integer

Note:

The implementation prioritizes read performance over write consistency since partition counts typically only increase during normal operation.

Reads partition count for a topic with automatic refresh when expired

This method will return the cached partition count if available and not expired. If the value is expired or not available, it will execute the provided block to fetch the current value from Kafka.

Parameters:

  • topic (String)

    Kafka topic name

Yields:

  • Block that returns the current partition count when cache needs refreshing

Yield Returns:

  • (Integer)

    Current partition count retrieved from Kafka

Returns:

  • (Integer)

    Partition count for the topic



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/rdkafka/producer/partitions_count_cache.rb', line 80

def get(topic)
  current_info = @counts[topic]

  if current_info.nil? || expired?(current_info[0])
    new_count = yield

    if current_info.nil?
      # No existing data, create a new entry with mutex
      set(topic, new_count)

      return new_count
    else
      current_count = current_info[1]

      if new_count > current_count
        # Higher value needs mutex to update both timestamp and count
        set(topic, new_count)

        return new_count
      else
        # Same or lower value, just update timestamp without mutex
        refresh_timestamp(topic)

        return current_count
      end
    end
  end

  current_info[1]
end

#set(topic, new_count) ⇒ Object

Note:

We prioritize higher partition counts and only accept them when using a mutex to ensure consistency. This design decision is based on the fact that partition counts in Kafka only increase during normal operation.

Update partition count for a topic when needed

This method updates the partition count for a topic in the cache. It uses a mutex to ensure thread-safety during updates.

Parameters:

  • topic (String)

    Kafka topic name

  • new_count (Integer)

    New partition count value



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/rdkafka/producer/partitions_count_cache.rb', line 122

def set(topic, new_count)
  # First check outside mutex to avoid unnecessary locking
  current_info = @counts[topic]

  # For lower values, we don't update count but might need to refresh timestamp
  if current_info && new_count < current_info[1]
    refresh_timestamp(topic)

    return
  end

  # Only lock the specific topic mutex
  mutex_for(topic).synchronize do
    # Check again inside the lock as another thread might have updated
    current_info = @counts[topic]

    if current_info.nil?
      # Create new entry
      @counts[topic] = [monotonic_now, new_count]
    else
      current_count = current_info[1]

      if new_count > current_count
        # Update to higher count value
        current_info[0] = monotonic_now
        current_info[1] = new_count
      else
        # Same or lower count, update timestamp only
        current_info[0] = monotonic_now
      end
    end
  end
end

#to_hHash

Returns hash with ttls and partitions counts array.

Returns:

  • (Hash)

    hash with ttls and partitions counts array



157
158
159
# File 'lib/rdkafka/producer/partitions_count_cache.rb', line 157

def to_h
  @counts
end