Class: Rdkafka::Producer

Inherits:
Object
  • Object
show all
Includes:
Helpers::OAuth, Helpers::Time
Defined in:
lib/rdkafka/producer.rb,
lib/rdkafka/producer/delivery_handle.rb,
lib/rdkafka/producer/delivery_report.rb

Overview

A producer for Kafka messages. To create a producer set up a Config and call producer on that.

Defined Under Namespace

Classes: DeliveryHandle, DeliveryReport, TopicHandleCreationError

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::OAuth

#oauthbearer_set_token, #oauthbearer_set_token_failure

Methods included from Helpers::Time

#monotonic_now

Instance Attribute Details

#delivery_callback=(callback) ⇒ nil

Set a callback that will be called every time a message is successfully produced. The callback is called with a DeliveryReport and DeliveryHandle

Parameters:

  • callback (Proc, #call)

    The callback

Returns:

  • (nil)

Raises:

  • (TypeError)


132
133
134
135
136
# File 'lib/rdkafka/producer.rb', line 132

def delivery_callback=(callback)
  raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call)
  @delivery_callback = callback
  @delivery_callback_arity = arity(callback)
end

Instance Method Details

#arity(callback) ⇒ Integer

Figures out the arity of a given block/method

Parameters:

  • callback (#call, Proc)

Returns:

  • (Integer)

    arity of the provided block/method



390
391
392
393
394
# File 'lib/rdkafka/producer.rb', line 390

def arity(callback)
  return callback.arity if callback.respond_to?(:arity)

  callback.method(:call).arity
end

#call_delivery_callback(delivery_report, delivery_handle) ⇒ Object

Calls (if registered) the delivery callback

Parameters:



373
374
375
376
377
378
379
380
381
382
383
384
# File 'lib/rdkafka/producer.rb', line 373

def call_delivery_callback(delivery_report, delivery_handle)
  return unless @delivery_callback

  case @delivery_callback_arity
  when 0
    @delivery_callback.call
  when 1
    @delivery_callback.call(delivery_report)
  else
    @delivery_callback.call(delivery_report, delivery_handle)
  end
end

#closeObject

Close this producer and wait for the internal poll queue to empty.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/rdkafka/producer.rb', line 139

def close
  return if closed?
  ObjectSpace.undefine_finalizer(self)

  @native_kafka.close do
    # We need to remove the topics references objects before we destroy the producer,
    # otherwise they would leak out
    @topics_refs_map.each_value do |refs|
      refs.each_value do |ref|
        Rdkafka::Bindings.rd_kafka_topic_destroy(ref)
      end
    end
  end

  @topics_refs_map.clear
end

#closed?Boolean

Whether this producer has closed

Returns:

  • (Boolean)


157
158
159
# File 'lib/rdkafka/producer.rb', line 157

def closed?
  @native_kafka.closed?
end

#flush(timeout_ms = 5_000) ⇒ Boolean

Note:

We raise an exception for other errors because based on the librdkafka docs, there should be no other errors.

Note:

For timed_out we do not raise an error to keep it backwards compatible

Wait until all outstanding producer requests are completed, with the given timeout in seconds. Call this before closing a producer to ensure delivery of all messages.

Parameters:

  • timeout_ms (Integer) (defaults to: 5_000)

    how long should we wait for flush of all messages

Returns:

  • (Boolean)

    true if no more data and all was flushed, false in case there are still outgoing messages after the timeout



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/rdkafka/producer.rb', line 172

def flush(timeout_ms=5_000)
  closed_producer_check(__method__)

  code = nil

  @native_kafka.with_inner do |inner|
    code = Rdkafka::Bindings.rd_kafka_flush(inner, timeout_ms)
  end

  # Early skip not to build the error message
  return true if code.zero?

  error = Rdkafka::RdkafkaError.new(code)

  return false if error.code == :timed_out

  raise(error)
end

#nameString

Returns producer name.

Returns:

  • (String)

    producer name



120
121
122
123
124
# File 'lib/rdkafka/producer.rb', line 120

def name
  @name ||= @native_kafka.with_inner do |inner|
    ::Rdkafka::Bindings.rd_kafka_name(inner)
  end
end

#partition_count(topic) ⇒ Integer

Note:

If ‘allow.auto.create.topics’ is set to true in the broker, the topic will be auto-created after returning nil.

Note:

We cache the partition count for a given topic for given time. This prevents us in case someone uses partition_key from querying for the count with each message. Instead we query once every 30 seconds at most if we have a valid partition count or every 5 seconds in case we were not able to obtain number of partitions

Partition count for a given topic.

Parameters:

  • topic (String)

    The topic name.

Returns:

  • (Integer)

    partition count for a given topic or -1 if it could not be obtained.



229
230
231
232
233
234
235
236
237
# File 'lib/rdkafka/producer.rb', line 229

def partition_count(topic)
  closed_producer_check(__method__)

  @_partitions_count_cache.delete_if do |_, cached|
    monotonic_now - cached.first > PARTITIONS_COUNT_TTL
  end

  @_partitions_count_cache[topic].last
end

#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil, label: nil, topic_config: EMPTY_HASH) ⇒ DeliveryHandle

Produces a message to a Kafka topic. The message is added to rdkafka’s queue, call wait on the returned delivery handle to make sure it is delivered.

When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. When a timestamp is provided this is used instead of the auto-generated timestamp.

Parameters:

  • topic (String)

    The topic to produce to

  • payload (String, nil) (defaults to: nil)

    The message’s payload

  • key (String, nil) (defaults to: nil)

    The message’s key

  • partition (Integer, nil) (defaults to: nil)

    Optional partition to produce to

  • partition_key (String, nil) (defaults to: nil)

    Optional partition key based on which partition assignment can happen

  • timestamp (Time, Integer, nil) (defaults to: nil)

    Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970.

  • headers (Hash<String,String>) (defaults to: nil)

    Optional message headers

  • label (Object, nil) (defaults to: nil)

    a label that can be assigned when producing a message that will be part of the delivery handle and the delivery report

  • topic_config (Hash) (defaults to: EMPTY_HASH)

    topic config for given message dispatch. Allows to send messages to topics with different configuration

Returns:

  • (DeliveryHandle)

    Delivery handle that can be used to wait for the result of producing this message

Raises:

  • (RdkafkaError)

    When adding the message to rdkafka’s queue failed



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/rdkafka/producer.rb', line 257

def produce(
  topic:,
  payload: nil,
  key: nil,
  partition: nil,
  partition_key: nil,
  timestamp: nil,
  headers: nil,
  label: nil,
  topic_config: EMPTY_HASH
)
  closed_producer_check(__method__)

  # Start by checking and converting the input

  # Get payload length
  payload_size = if payload.nil?
                   0
                 else
                   payload.bytesize
                 end

  # Get key length
  key_size = if key.nil?
               0
             else
               key.bytesize
             end

  topic_config_hash = topic_config.hash

  # Checks if we have the rdkafka topic reference object ready. It saves us on object
  # allocation and allows to use custom config on demand.
  set_topic_config(topic, topic_config, topic_config_hash) unless @topics_refs_map.dig(topic, topic_config_hash)
  topic_ref = @topics_refs_map.dig(topic, topic_config_hash)

  if partition_key
    partition_count = partition_count(topic)

    # Check if there are no overrides for the partitioner and use the default one only when
    # no per-topic is present.
    partitioner_name = @topics_configs.dig(topic, topic_config_hash, :partitioner) || @partitioner_name

    # If the topic is not present, set to -1
    partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, partitioner_name) if partition_count.positive?
  end

  # If partition is nil, use -1 to let librdafka set the partition randomly or
  # based on the key when present.
  partition ||= -1

  # If timestamp is nil use 0 and let Kafka set one. If an integer or time
  # use it.
  raw_timestamp = if timestamp.nil?
                    0
                  elsif timestamp.is_a?(Integer)
                    timestamp
                  elsif timestamp.is_a?(Time)
                    (timestamp.to_i * 1000) + (timestamp.usec / 1000)
                  else
                    raise TypeError.new("Timestamp has to be nil, an Integer or a Time")
                  end

  delivery_handle = DeliveryHandle.new
  delivery_handle.label = label
  delivery_handle.topic = topic
  delivery_handle[:pending] = true
  delivery_handle[:response] = -1
  delivery_handle[:partition] = -1
  delivery_handle[:offset] = -1
  DeliveryHandle.register(delivery_handle)

  args = [
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_RKT, :pointer, topic_ref,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, raw_timestamp,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_OPAQUE, :pointer, delivery_handle,
  ]

  if headers
    headers.each do |key0, value0|
      key = key0.to_s
      value = value0.to_s
      args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER
      args << :string << key
      args << :pointer << value
      args << :size_t << value.bytesize
    end
  end

  args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_END

  # Produce the message
  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_producev(
      inner,
      *args
    )
  end

  # Raise error if the produce call was not successful
  if response != 0
    DeliveryHandle.remove(delivery_handle.to_ptr.address)
    raise RdkafkaError.new(response)
  end

  delivery_handle
end

#purgeObject

Purges the outgoing queue and releases all resources.

Useful when closing the producer with outgoing messages to unstable clusters or when for any other reasons waiting cannot go on anymore. This purges both the queue and all the inflight requests + updates the delivery handles statuses so they can be materialized into purge_queue errors.



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/rdkafka/producer.rb', line 197

def purge
  closed_producer_check(__method__)

  code = nil

  @native_kafka.with_inner do |inner|
    code = Bindings.rd_kafka_purge(
      inner,
      Bindings::RD_KAFKA_PURGE_F_QUEUE | Bindings::RD_KAFKA_PURGE_F_INFLIGHT
    )
  end

  code.zero? || raise(Rdkafka::RdkafkaError.new(code))

  # Wait for the purge to affect everything
  sleep(0.001) until flush(100)

  true
end

#set_topic_config(topic, config, config_hash) ⇒ Object

Note:

It is not allowed to re-set the same topic config twice because of the underlying librdkafka caching

Sets alternative set of configuration details that can be set per topic

Parameters:

  • topic (String)

    The topic name

  • config (Hash)

    config we want to use per topic basis

  • config_hash (Integer)

    hash of the config. We expect it here instead of computing it, because it is already computed during the retrieval attempt in the #produce flow.



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/rdkafka/producer.rb', line 74

def set_topic_config(topic, config, config_hash)
  # Ensure lock on topic reference just in case
  @native_kafka.with_inner do |inner|
    @topics_refs_map[topic] ||= {}
    @topics_configs[topic] ||= {}

    return if @topics_configs[topic].key?(config_hash)

    # If config is empty, we create an empty reference that will be used with defaults
    rd_topic_config = if config.empty?
                        nil
                      else
                        Rdkafka::Bindings.rd_kafka_topic_conf_new.tap do |topic_config|
                          config.each do |key, value|
                            error_buffer = FFI::MemoryPointer.new(:char, 256)
                            result = Rdkafka::Bindings.rd_kafka_topic_conf_set(
                              topic_config,
                              key.to_s,
                              value.to_s,
                              error_buffer,
                              256
                            )

                            unless result == :config_ok
                              raise Config::ConfigError.new(error_buffer.read_string)
                            end
                          end
                        end
                      end

    topic_handle = Bindings.rd_kafka_topic_new(inner, topic, rd_topic_config)

    raise TopicHandleCreationError.new("Error creating topic handle for topic #{topic}") if topic_handle.null?

    @topics_configs[topic][config_hash] = config
    @topics_refs_map[topic][config_hash] = topic_handle
  end
end

#startObject

Note:

Not needed to run unless explicit start was disabled

Starts the native Kafka polling thread and kicks off the init polling



115
116
117
# File 'lib/rdkafka/producer.rb', line 115

def start
  @native_kafka.start
end