Class: Karafka::Connection::Proxy

Inherits:
SimpleDelegator
  • Object
show all
Defined in:
lib/karafka/connection/proxy.rb

Overview

Usually it is ok to use the Rdkafka::Consumer directly because we need 1:1 its functionality. There are however cases where we want to have extra recoveries or other handling of errors and settings. This is where this module comes in handy.

We do not want to wrap and delegate all via a proxy object for performance reasons, but we do still want to be able to alter some functionalities. This wrapper helps us do it when it would be needed

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(obj) ⇒ Proxy

Returns a new instance of Proxy.

Parameters:

  • obj (Rdkafka::Consumer, Proxy)

    rdkafka consumer or consumer wrapped with proxy



38
39
40
41
42
43
44
# File 'lib/karafka/connection/proxy.rb', line 38

def initialize(obj)
  super
  # Do not allow for wrapping proxy with a proxy. This will prevent a case where we might
  # wrap an already wrapped object with another proxy level. Simplifies passing consumers
  # and makes it safe to wrap without type checking
  @wrapped = obj.is_a?(self.class) ? obj.wrapped : obj
end

Instance Attribute Details

#wrappedObject Also known as: __getobj__

Returns the value of attribute wrapped.



33
34
35
# File 'lib/karafka/connection/proxy.rb', line 33

def wrapped
  @wrapped
end

Instance Method Details

#commit_offsets(tpl = nil, async: true) ⇒ Boolean

Note:

We do not consider no_offset as any problem and we allow to commit offsets even when no stored, because with sync commit, it refreshes the ownership state of the consumer in a sync way.

Non thread-safe message committing method

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList, nil) (defaults to: nil)

    tpl or nil

  • async (Boolean) (defaults to: true)

    should the commit happen async or sync (async by default)

Returns:

  • (Boolean)

    true if offset commit worked, false if we’ve lost the assignment



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/karafka/connection/proxy.rb', line 127

def commit_offsets(tpl = nil, async: true)
  c_config = proxy_config.commit

  with_broker_errors_retry(
    wait_time: c_config.wait_time / 1_000.to_f,
    max_attempts: c_config.max_attempts
  ) do
    @wrapped.commit(tpl, async)
  end

  true
rescue Rdkafka::RdkafkaError => e
  case e.code
  when :assignment_lost
    return false
  when :unknown_member_id
    return false
  when :illegal_generation
    return false
  when :no_offset
    return true
  when :coordinator_load_in_progress
    sleep(1)
    retry
  end

  raise e
end

#committed(tpl = nil) ⇒ Rdkafka::Consumer::TopicPartitionList

Similar to #query_watermark_offsets.

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList, nil) (defaults to: nil)

    tpl or nil for full current assignment tpl usage

Returns:

  • (Rdkafka::Consumer::TopicPartitionList)

    tpl with committed offsets and metadata



90
91
92
93
94
95
96
97
98
99
100
# File 'lib/karafka/connection/proxy.rb', line 90

def committed(tpl = nil)
  c_config = proxy_config.committed

  with_broker_errors_retry(
    # required to be in seconds, not ms
    wait_time: c_config.wait_time / 1_000.to_f,
    max_attempts: c_config.max_attempts
  ) do
    @wrapped.committed(tpl, c_config.timeout)
  end
end

#lag(tpl) ⇒ Hash{String => Hash}

Returns hash with topics and their partitions lags.

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList)

    list of topics and partitions for which we want to get the lag on the defined CG

Returns:

  • (Hash{String => Hash})

    hash with topics and their partitions lags



159
160
161
162
163
164
165
166
167
168
169
# File 'lib/karafka/connection/proxy.rb', line 159

def lag(tpl)
  l_config = proxy_config.committed

  with_broker_errors_retry(
    # required to be in seconds, not ms
    wait_time: l_config.wait_time / 1_000.to_f,
    max_attempts: l_config.max_attempts
  ) do
    @wrapped.lag(tpl, l_config.timeout)
  end
end

#metadata(topic_name = nil) ⇒ Rdkafka::Metadata

Returns rdkafka metadata object with the requested details.

Parameters:

  • topic_name (String, nil) (defaults to: nil)

    Name of the topic we’re interested in or nil if we want to get info on all topics

Returns:

  • (Rdkafka::Metadata)

    rdkafka metadata object with the requested details



174
175
176
177
178
179
180
181
182
183
184
# File 'lib/karafka/connection/proxy.rb', line 174

def (topic_name = nil)
  m_config = proxy_config.

  with_broker_errors_retry(
    # required to be in seconds, not ms
    wait_time: m_config.wait_time / 1_000.to_f,
    max_attempts: m_config.max_attempts
  ) do
    @wrapped.(topic_name, m_config.timeout)
  end
end

#offsets_for_times(tpl) ⇒ Rdkafka::Consumer::TopicPartitionList

Similar to #query_watermark_offsets, this method can be sensitive to latency. We handle this the same way

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList)

    tpl to get time offsets

Returns:

  • (Rdkafka::Consumer::TopicPartitionList)

    tpl with time offsets



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/karafka/connection/proxy.rb', line 73

def offsets_for_times(tpl)
  l_config = proxy_config.offsets_for_times

  with_broker_errors_retry(
    # required to be in seconds, not ms
    wait_time: l_config.wait_time / 1_000.to_f,
    max_attempts: l_config.max_attempts
  ) do
    @wrapped.offsets_for_times(tpl, l_config.timeout)
  end
end

#query_watermark_offsets(topic, partition) ⇒ Array<Integer, Integer>

Proxies the #query_watermark_offsets with extra recovery from timeout problems. We impose our own custom timeout to make sure, that high-latency clusters and overloaded clusters can handle our requests.

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number

Returns:

  • (Array<Integer, Integer>)

    watermark offsets



53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/karafka/connection/proxy.rb', line 53

def query_watermark_offsets(topic, partition)
  l_config = proxy_config.query_watermark_offsets

  # For newly created topics or in cases where we're trying to get them but there is no
  # leader, this can fail. It happens more often for new topics under KRaft, however we
  # still want to make sure things operate as expected even then
  with_broker_errors_retry(
    # required to be in seconds, not ms
    wait_time: l_config.wait_time / 1_000.to_f,
    max_attempts: l_config.max_attempts
  ) do
    @wrapped.query_watermark_offsets(topic, partition, l_config.timeout)
  end
end

#store_offset(message, metadata = nil) ⇒ Boolean

When we cannot store an offset, it means we no longer own the partition

Non thread-safe offset storing method

Parameters:

Returns:

  • (Boolean)

    true if we could store the offset (if we still own the partition)



108
109
110
111
112
113
114
115
116
117
118
# File 'lib/karafka/connection/proxy.rb', line 108

def store_offset(message,  = nil)
  @wrapped.store_offset(message, )

  true
rescue Rdkafka::RdkafkaError => e
  return false if e.code == :assignment_lost
  return false if e.code == :state
  return false if e.code == :illegal_generation

  raise e
end