Module: WaterDrop::Patches::Rdkafka::Metadata
- Defined in:
- lib/waterdrop/patches/rdkafka/metadata.rb
Overview
Rdkafka::Metadata patches
Instance Method Summary collapse
-
#initialize(*args) ⇒ Object
We overwrite this method because there were reports of metadata operation timing out when Kafka was under stress.
Instance Method Details
#initialize(*args) ⇒ Object
We overwrite this method because there were reports of metadata operation timing out when Kafka was under stress. While the messages dispatch will be retried, metadata fetch happens prior to that, effectively crashing the process. Metadata fetch was not being retried at all.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/waterdrop/patches/rdkafka/metadata.rb', line 24 def initialize(*args) attempt ||= 0 attempt += 1 super(*args) rescue ::Rdkafka::RdkafkaError => e raise unless RETRIED_ERRORS.include?(e.code) raise if attempt > 10 backoff_factor = 2**attempt timeout = backoff_factor * 0.1 sleep(timeout) retry end |