Class: Karafka::Connection::Proxy

Inherits:
SimpleDelegator
  • Object
show all
Defined in:
lib/karafka/connection/proxy.rb

Overview

Usually it is ok to use the Rdkafka::Consumer directly because we need 1:1 its functionality. There are however cases where we want to have extra recoveries or other handling of errors and settings. This is where this module comes in handy.

We do not want to wrap and delegate all via a proxy object for performance reasons, but we do still want to be able to alter some functionalities. This wrapper helps us do it when it would be needed

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(obj) ⇒ Proxy

Returns a new instance of Proxy.

Parameters:

  • obj (Rdkafka::Consumer, Proxy)

    rdkafka consumer or consumer wrapped with proxy



33
34
35
36
37
38
39
40
# File 'lib/karafka/connection/proxy.rb', line 33

def initialize(obj)
  super
  # Do not allow for wrapping proxy with a proxy. This will prevent a case where we might
  # wrap an already wrapped object with another proxy level. Simplifies passing consumers
  # and makes it safe to wrap without type checking
  @wrapped = obj.is_a?(self.class) ? obj.wrapped : obj
  @config = ::Karafka::App.config.internal.connection.proxy
end

Instance Attribute Details

#wrappedObject Also known as: __getobj__

Returns the value of attribute wrapped.



28
29
30
# File 'lib/karafka/connection/proxy.rb', line 28

def wrapped
  @wrapped
end

Instance Method Details

#commit_offsets(tpl = nil, async: true) ⇒ Boolean

Note:

We do not consider no_offset as any problem and we allow to commit offsets even when no stored, because with sync commit, it refreshes the ownership state of the consumer in a sync way.

Non thread-safe message committing method

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList, nil) (defaults to: nil)

    tpl or nil

  • async (Boolean) (defaults to: true)

    should the commit happen async or sync (async by default)

Returns:

  • (Boolean)

    true if offset commit worked, false if we’ve lost the assignment



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/karafka/connection/proxy.rb', line 122

def commit_offsets(tpl = nil, async: true)
  c_config = @config.commit

  with_broker_errors_retry(
    wait_time: c_config.wait_time / 1_000.to_f,
    max_attempts: c_config.max_attempts
  ) do
    @wrapped.commit(tpl, async)
  end

  true
rescue Rdkafka::RdkafkaError => e
  case e.code
  when :assignment_lost
    return false
  when :unknown_member_id
    return false
  when :no_offset
    return true
  when :coordinator_load_in_progress
    sleep(1)
    retry
  end

  raise e
end

#committed(tpl = nil) ⇒ Rdkafka::Consumer::TopicPartitionList

Similar to #query_watermark_offsets.

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList, nil) (defaults to: nil)

    tpl or nil for full current assignment tpl usage

Returns:

  • (Rdkafka::Consumer::TopicPartitionList)

    tpl with committed offsets and metadata



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/karafka/connection/proxy.rb', line 86

def committed(tpl = nil)
  c_config = @config.committed

  with_broker_errors_retry(
    # required to be in seconds, not ms
    wait_time: c_config.wait_time / 1_000.to_f,
    max_attempts: c_config.max_attempts
  ) do
    @wrapped.committed(tpl, c_config.timeout)
  end
end

#lag(tpl) ⇒ Hash<String, Hash>

Returns hash with topics and their partitions lags.

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList)

    list of topics and partitions for which we want to get the lag on the defined CG

Returns:

  • (Hash<String, Hash>)

    hash with topics and their partitions lags



152
153
154
155
156
157
158
159
160
161
162
# File 'lib/karafka/connection/proxy.rb', line 152

def lag(tpl)
  l_config = @config.committed

  with_broker_errors_retry(
    # required to be in seconds, not ms
    wait_time: l_config.wait_time / 1_000.to_f,
    max_attempts: l_config.max_attempts
  ) do
    @wrapped.lag(tpl, l_config.timeout)
  end
end

#metadata(topic_name = nil) ⇒ Rdkafka::Metadata

Returns rdkafka metadata object with the requested details.

Parameters:

  • topic_name (String, nil) (defaults to: nil)

    Name of the topic we’re interested in or nil if we want to get info on all topics

Returns:

  • (Rdkafka::Metadata)

    rdkafka metadata object with the requested details



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/karafka/connection/proxy.rb', line 167

def (topic_name = nil)
  m_config = @config.

  with_broker_errors_retry(
    # required to be in seconds, not ms
    wait_time: m_config.wait_time / 1_000.to_f,
    max_attempts: m_config.max_attempts
  ) do
    @wrapped.(topic_name, m_config.timeout)
  end
end

#offsets_for_times(tpl) ⇒ Rdkafka::Consumer::TopicPartitionList

Similar to #query_watermark_offsets, this method can be sensitive to latency. We handle this the same way

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList)

    tpl to get time offsets

Returns:

  • (Rdkafka::Consumer::TopicPartitionList)

    tpl with time offsets



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/karafka/connection/proxy.rb', line 69

def offsets_for_times(tpl)
  l_config = @config.offsets_for_times

  with_broker_errors_retry(
    # required to be in seconds, not ms
    wait_time: l_config.wait_time / 1_000.to_f,
    max_attempts: l_config.max_attempts
  ) do
    @wrapped.offsets_for_times(tpl, l_config.timeout)
  end
end

#query_watermark_offsets(topic, partition) ⇒ Array<Integer, Integer>

Proxies the #query_watermark_offsets with extra recovery from timeout problems. We impose our own custom timeout to make sure, that high-latency clusters and overloaded clusters can handle our requests.

Parameters:

  • topic (String)

    topic name

  • partition (Partition)

Returns:

  • (Array<Integer, Integer>)

    watermark offsets



49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/karafka/connection/proxy.rb', line 49

def query_watermark_offsets(topic, partition)
  l_config = @config.query_watermark_offsets

  # For newly created topics or in cases where we're trying to get them but there is no
  # leader, this can fail. It happens more often for new topics under KRaft, however we
  # still want to make sure things operate as expected even then
  with_broker_errors_retry(
    # required to be in seconds, not ms
    wait_time: l_config.wait_time / 1_000.to_f,
    max_attempts: l_config.max_attempts
  ) do
    @wrapped.query_watermark_offsets(topic, partition, l_config.timeout)
  end
end

#store_offset(message, metadata = nil) ⇒ Boolean

When we cannot store an offset, it means we no longer own the partition

Non thread-safe offset storing method

Parameters:

Returns:

  • (Boolean)

    true if we could store the offset (if we still own the partition)



104
105
106
107
108
109
110
111
112
113
# File 'lib/karafka/connection/proxy.rb', line 104

def store_offset(message,  = nil)
  @wrapped.store_offset(message, )

  true
rescue Rdkafka::RdkafkaError => e
  return false if e.code == :assignment_lost
  return false if e.code == :state

  raise e
end