Class: Karafka::Connection::Proxy
- Inherits:
-
SimpleDelegator
- Object
- SimpleDelegator
- Karafka::Connection::Proxy
- Defined in:
- lib/karafka/connection/proxy.rb
Overview
Usually it is ok to use the Rdkafka::Consumer
directly because we need 1:1 its functionality. There are however cases where we want to have extra recoveries or other handling of errors and settings. This is where this module comes in handy.
We do not want to wrap and delegate all via a proxy object for performance reasons, but we do still want to be able to alter some functionalities. This wrapper helps us do it when it would be needed
Instance Attribute Summary collapse
-
#wrapped ⇒ Object
(also: #__getobj__)
Returns the value of attribute wrapped.
Instance Method Summary collapse
-
#commit_offsets(tpl = nil, async: true) ⇒ Boolean
Non thread-safe message committing method.
-
#committed(tpl = nil) ⇒ Rdkafka::Consumer::TopicPartitionList
Similar to
#query_watermark_offsets
. -
#initialize(obj) ⇒ Proxy
constructor
A new instance of Proxy.
-
#lag(tpl) ⇒ Hash<String, Hash>
Hash with topics and their partitions lags.
-
#metadata(topic_name = nil) ⇒ Rdkafka::Metadata
Rdkafka metadata object with the requested details.
-
#offsets_for_times(tpl) ⇒ Rdkafka::Consumer::TopicPartitionList
Similar to
#query_watermark_offsets
, this method can be sensitive to latency. -
#query_watermark_offsets(topic, partition) ⇒ Array<Integer, Integer>
Proxies the
#query_watermark_offsets
with extra recovery from timeout problems. -
#store_offset(message, metadata = nil) ⇒ Boolean
When we cannot store an offset, it means we no longer own the partition.
Constructor Details
#initialize(obj) ⇒ Proxy
Returns a new instance of Proxy.
33 34 35 36 37 38 39 40 |
# File 'lib/karafka/connection/proxy.rb', line 33 def initialize(obj) super # Do not allow for wrapping proxy with a proxy. This will prevent a case where we might # wrap an already wrapped object with another proxy level. Simplifies passing consumers # and makes it safe to wrap without type checking @wrapped = obj.is_a?(self.class) ? obj.wrapped : obj @config = ::Karafka::App.config.internal.connection.proxy end |
Instance Attribute Details
#wrapped ⇒ Object Also known as: __getobj__
Returns the value of attribute wrapped.
28 29 30 |
# File 'lib/karafka/connection/proxy.rb', line 28 def wrapped @wrapped end |
Instance Method Details
#commit_offsets(tpl = nil, async: true) ⇒ Boolean
We do not consider no_offset
as any problem and we allow to commit offsets even when no stored, because with sync commit, it refreshes the ownership state of the consumer in a sync way.
Non thread-safe message committing method
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/karafka/connection/proxy.rb', line 122 def commit_offsets(tpl = nil, async: true) c_config = @config.commit with_broker_errors_retry( wait_time: c_config.wait_time / 1_000.to_f, max_attempts: c_config.max_attempts ) do @wrapped.commit(tpl, async) end true rescue Rdkafka::RdkafkaError => e case e.code when :assignment_lost return false when :unknown_member_id return false when :no_offset return true when :coordinator_load_in_progress sleep(1) retry end raise e end |
#committed(tpl = nil) ⇒ Rdkafka::Consumer::TopicPartitionList
Similar to #query_watermark_offsets
.
86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/karafka/connection/proxy.rb', line 86 def committed(tpl = nil) c_config = @config.committed with_broker_errors_retry( # required to be in seconds, not ms wait_time: c_config.wait_time / 1_000.to_f, max_attempts: c_config.max_attempts ) do @wrapped.committed(tpl, c_config.timeout) end end |
#lag(tpl) ⇒ Hash<String, Hash>
Returns hash with topics and their partitions lags.
152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/karafka/connection/proxy.rb', line 152 def lag(tpl) l_config = @config.committed with_broker_errors_retry( # required to be in seconds, not ms wait_time: l_config.wait_time / 1_000.to_f, max_attempts: l_config.max_attempts ) do @wrapped.lag(tpl, l_config.timeout) end end |
#metadata(topic_name = nil) ⇒ Rdkafka::Metadata
Returns rdkafka metadata object with the requested details.
167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/karafka/connection/proxy.rb', line 167 def (topic_name = nil) m_config = @config. with_broker_errors_retry( # required to be in seconds, not ms wait_time: m_config.wait_time / 1_000.to_f, max_attempts: m_config.max_attempts ) do @wrapped.(topic_name, m_config.timeout) end end |
#offsets_for_times(tpl) ⇒ Rdkafka::Consumer::TopicPartitionList
Similar to #query_watermark_offsets
, this method can be sensitive to latency. We handle this the same way
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/karafka/connection/proxy.rb', line 69 def offsets_for_times(tpl) l_config = @config.offsets_for_times with_broker_errors_retry( # required to be in seconds, not ms wait_time: l_config.wait_time / 1_000.to_f, max_attempts: l_config.max_attempts ) do @wrapped.offsets_for_times(tpl, l_config.timeout) end end |
#query_watermark_offsets(topic, partition) ⇒ Array<Integer, Integer>
Proxies the #query_watermark_offsets
with extra recovery from timeout problems. We impose our own custom timeout to make sure, that high-latency clusters and overloaded clusters can handle our requests.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/karafka/connection/proxy.rb', line 49 def query_watermark_offsets(topic, partition) l_config = @config.query_watermark_offsets # For newly created topics or in cases where we're trying to get them but there is no # leader, this can fail. It happens more often for new topics under KRaft, however we # still want to make sure things operate as expected even then with_broker_errors_retry( # required to be in seconds, not ms wait_time: l_config.wait_time / 1_000.to_f, max_attempts: l_config.max_attempts ) do @wrapped.query_watermark_offsets(topic, partition, l_config.timeout) end end |
#store_offset(message, metadata = nil) ⇒ Boolean
When we cannot store an offset, it means we no longer own the partition
Non thread-safe offset storing method
104 105 106 107 108 109 110 111 112 113 |
# File 'lib/karafka/connection/proxy.rb', line 104 def store_offset(, = nil) @wrapped.store_offset(, ) true rescue Rdkafka::RdkafkaError => e return false if e.code == :assignment_lost return false if e.code == :state raise e end |