Class: Rdkafka::Consumer

Inherits:
Object
  • Object
show all
Includes:
Enumerable, Helpers::Time
Defined in:
lib/rdkafka/consumer.rb,
lib/rdkafka/consumer/headers.rb,
lib/rdkafka/consumer/message.rb,
lib/rdkafka/consumer/partition.rb,
lib/rdkafka/consumer/topic_partition_list.rb

Overview

A consumer of Kafka messages. It uses the high-level consumer approach where the Kafka brokers automatically assign partitions and load balance partitions over consumers that have the same :"group.id" set in their configuration.

To create a consumer set up a Config and call consumer on that. It is mandatory to set :"group.id" in the configuration.

Consumer implements Enumerable, so you can use each to consume messages, or for example each_slice to consume batches of messages.

Defined Under Namespace

Modules: Headers Classes: Message, Partition, TopicPartitionList

Instance Method Summary collapse

Methods included from Helpers::Time

#monotonic_now

Instance Method Details

#assign(list) ⇒ Object

Atomic assignment of partitions to consume

Parameters:

Raises:



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/rdkafka/consumer.rb', line 175

def assign(list)
  closed_consumer_check(__method__)

  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_assign(inner, tpl)
    end
    if response != 0
      raise Rdkafka::RdkafkaError.new(response, "Error assigning '#{list.to_h}'")
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#assignmentTopicPartitionList

Returns the current partition assignment.

Returns:

Raises:



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/rdkafka/consumer.rb', line 200

def assignment
  closed_consumer_check(__method__)

  ptr = FFI::MemoryPointer.new(:pointer)
  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_assignment(inner, ptr)
  end
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end

  tpl = ptr.read_pointer

  if !tpl.null?
    begin
      Rdkafka::Consumer::TopicPartitionList.from_native_tpl(tpl)
    ensure
      Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy tpl
    end
  end
ensure
  ptr.free unless ptr.nil?
end

#assignment_lost?Boolean

Returns true if our current assignment has been lost involuntarily.

Returns:

  • (Boolean)

    true if our current assignment has been lost involuntarily.



225
226
227
228
229
230
231
# File 'lib/rdkafka/consumer.rb', line 225

def assignment_lost?
  closed_consumer_check(__method__)

  @native_kafka.with_inner do |inner|
    !Rdkafka::Bindings.rd_kafka_assignment_lost(inner).zero?
  end
end

#closenil

Close this consumer

Returns:

  • (nil)


35
36
37
38
39
40
41
42
43
44
# File 'lib/rdkafka/consumer.rb', line 35

def close
  return if closed?
  ObjectSpace.undefine_finalizer(self)

  @native_kafka.synchronize do |inner|
    Rdkafka::Bindings.rd_kafka_consumer_close(inner)
  end

  @native_kafka.close
end

#closed?Boolean

Whether this consumer has closed

Returns:

  • (Boolean)


47
48
49
# File 'lib/rdkafka/consumer.rb', line 47

def closed?
  @native_kafka.closed?
end

#cluster_idString?

Returns the ClusterId as reported in broker metadata.

Returns:

  • (String, nil)


361
362
363
364
365
366
# File 'lib/rdkafka/consumer.rb', line 361

def cluster_id
  closed_consumer_check(__method__)
  @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_clusterid(inner)
  end
end

#commit(list = nil, async = false) ⇒ nil

Manually commit the current offsets of this consumer.

To use this set enable.auto.committo false to disable automatic triggering of commits.

If enable.auto.offset.store is set to true the offset of the last consumed message for every partition is used. If set to false you can use #store_offset to indicate when a message has been fully processed.

Parameters:

  • list (TopicPartitionList, nil) (defaults to: nil)

    The topic with partitions to commit

  • async (Boolean) (defaults to: false)

    Whether to commit async or wait for the commit to finish

Returns:

  • (nil)

Raises:



491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
# File 'lib/rdkafka/consumer.rb', line 491

def commit(list=nil, async=false)
  closed_consumer_check(__method__)

  if !list.nil? && !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end

  tpl = list ? list.to_native_tpl : nil

  begin
    response = @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_commit(inner, tpl, async)
    end
    if response != 0
      raise Rdkafka::RdkafkaError.new(response)
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
  end
end

#committed(list = nil, timeout_ms = 2000) ⇒ TopicPartitionList

Return the current committed offset per partition for this consumer group. The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.

Parameters:

  • list (TopicPartitionList, nil) (defaults to: nil)

    The topic with partitions to get the offsets for or nil to use the current subscription.

  • timeout_ms (Integer) (defaults to: 2000)

    The timeout for fetching this information.

Returns:

Raises:

  • (RdkafkaError)

    When getting the committed positions fails.



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/rdkafka/consumer.rb', line 242

def committed(list=nil, timeout_ms=2000)
  closed_consumer_check(__method__)

  if list.nil?
    list = assignment
  elsif !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_committed(inner, tpl, timeout_ms)
    end
    if response != 0
      raise Rdkafka::RdkafkaError.new(response)
    end
    TopicPartitionList.from_native_tpl(tpl)
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#each {|message| ... } ⇒ nil

Poll for new messages and yield for each received one. Iteration will end when the consumer is closed.

If enable.partition.eof is turned on in the config this will raise an error when an eof is reached, so you probably want to disable that when using this method of iteration.

Yield Parameters:

  • message (Message)

    Received message

Returns:

  • (nil)

Raises:



577
578
579
580
581
582
583
584
585
586
587
588
589
590
# File 'lib/rdkafka/consumer.rb', line 577

def each
  loop do
    message = poll(250)
    if message
      yield(message)
    else
      if closed?
        break
      else
        next
      end
    end
  end
end

#each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false) {|messages, pending_exception| ... } ⇒ nil

Poll for new messages and yield them in batches that may contain messages from more than one partition.

Rather than yield each message immediately as soon as it is received, each_batch will attempt to wait for as long as timeout_ms in order to create a batch of up to but no more than max_items in size.

Said differently, if more than max_items are available within timeout_ms, then each_batch will yield early with max_items in the array, but if timeout_ms passes by with fewer messages arriving, it will yield an array of fewer messages, quite possibly zero.

In order to prevent wrongly auto committing many messages at once across possibly many partitions, callers must explicitly indicate which messages have been successfully processed as some consumed messages may not have been yielded yet. To do this, the caller should set enable.auto.offset.store to false and pass processed messages to #store_offset. It is also possible, though more complex, to set ‘enable.auto.commit’ to false and then pass a manually assembled TopicPartitionList to #commit.

As with each, iteration will end when the consumer is closed.

Exception behavior is more complicated than with each, in that if :yield_on_error is true, and an exception is raised during the poll, and messages have already been received, they will be yielded to the caller before the exception is allowed to propagate.

If you are setting either auto.commit or auto.offset.store to false in the consumer configuration, then you should let yield_on_error keep its default value of false because you are guaranteed to see these messages again. However, if both auto.commit and auto.offset.store are set to true, you should set yield_on_error to true so you can process messages that you may or may not see again.

which will be propagated after processing of the partial batch is complete.

Parameters:

  • max_items (Integer) (defaults to: 100)

    Maximum size of the yielded array of messages

  • bytes_threshold (Integer) (defaults to: Float::INFINITY)

    Threshold number of total message bytes in the yielded array of messages

  • timeout_ms (Integer) (defaults to: 250)

    max time to wait for up to max_items

Yields:

  • (messages, pending_exception)

Yield Parameters:

  • messages (Array)

    An array of received Message

  • pending_exception (Exception)

    normally nil, or an exception

Returns:

  • (nil)

Raises:



640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
# File 'lib/rdkafka/consumer.rb', line 640

def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block)
  closed_consumer_check(__method__)
  slice = []
  bytes = 0
  end_time = monotonic_now + timeout_ms / 1000.0
  loop do
    break if closed?
    max_wait = end_time - monotonic_now
    max_wait_ms = if max_wait <= 0
                    0  # should not block, but may retrieve a message
                  else
                    (max_wait * 1000).floor
                  end
    message = nil
    begin
      message = poll max_wait_ms
    rescue Rdkafka::RdkafkaError => error
      raise unless yield_on_error
      raise if slice.empty?
      yield slice.dup, error
      raise
    end
    if message
      slice << message
      bytes += message.payload.bytesize if message.payload
    end
    if slice.size == max_items || bytes >= bytes_threshold || monotonic_now >= end_time - 0.001
      yield slice.dup, nil
      slice.clear
      bytes = 0
      end_time = monotonic_now + timeout_ms / 1000.0
    end
  end
end

#events_poll(timeout_ms = 0) ⇒ Object

Note:

This method technically should be called #poll and the current #poll should be called #consumer_poll though we keep the current naming convention to make it backward compatible.

Polls the main rdkafka queue (not the consumer one). Do NOT use it if consumer_poll_set was set to true.

Events will cause application-provided callbacks to be called.

Events (in the context of the consumer): - error callbacks - stats callbacks - any other callbacks supported by librdkafka that are not part of the consumer_poll, that would have a callback configured and activated.

This method needs to be called at regular intervals to serve any queued callbacks waiting to be called. When in use, does NOT replace #poll but needs to run complementary with it.

Parameters:

  • timeout_ms (Integer) (defaults to: 0)

    poll timeout. If set to 0 will run async, when set to -1 will block until any events available.



562
563
564
565
566
# File 'lib/rdkafka/consumer.rb', line 562

def events_poll(timeout_ms = 0)
  @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_poll(inner, timeout_ms)
  end
end

#finalizerObject



29
30
31
# File 'lib/rdkafka/consumer.rb', line 29

def finalizer
  ->(_) { close }
end

#lag(topic_partition_list, watermark_timeout_ms = 1000) ⇒ Hash<String, Hash<Integer, Integer>>

Calculate the consumer lag per partition for the provided topic partition list. You can get a suitable list by calling #committed or #position (TODO). It is also possible to create one yourself, in this case you have to provide a list that already contains all the partitions you need the lag for.

Parameters:

  • topic_partition_list (TopicPartitionList)

    The list to calculate lag for.

  • watermark_timeout_ms (Integer) (defaults to: 1000)

    The timeout for each query watermark call.

Returns:

  • (Hash<String, Hash<Integer, Integer>>)

    A hash containing all topics with the lag per partition

Raises:



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/rdkafka/consumer.rb', line 337

def lag(topic_partition_list, watermark_timeout_ms=1000)
  out = {}

  topic_partition_list.to_h.each do |topic, partitions|
    # Query high watermarks for this topic's partitions
    # and compare to the offset in the list.
    topic_out = {}
    partitions.each do |p|
      next if p.offset.nil?
      low, high = query_watermark_offsets(
        topic,
        p.partition,
        watermark_timeout_ms
      )
      topic_out[p.partition] = high - p.offset
    end
    out[topic] = topic_out
  end
  out
end

#member_idString?

Returns this client’s broker-assigned group member id

This currently requires the high-level KafkaConsumer

Returns:

  • (String, nil)


373
374
375
376
377
378
# File 'lib/rdkafka/consumer.rb', line 373

def member_id
  closed_consumer_check(__method__)
  @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_memberid(inner)
  end
end

#nameString

Returns consumer name.

Returns:

  • (String)

    consumer name



23
24
25
26
27
# File 'lib/rdkafka/consumer.rb', line 23

def name
  @name ||= @native_kafka.with_inner do |inner|
    ::Rdkafka::Bindings.rd_kafka_name(inner)
  end
end

#offsets_for_times(list, timeout_ms = 1000) ⇒ TopicPartitionList

Lookup offset for the given partitions by timestamp.

Parameters:

Returns:

Raises:

  • (RdKafkaError)

    When the OffsetForTimes lookup fails



452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
# File 'lib/rdkafka/consumer.rb', line 452

def offsets_for_times(list, timeout_ms = 1000)
  closed_consumer_check(__method__)

  if !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_offsets_for_times(
      inner,
      tpl,
      timeout_ms # timeout
    )
  end

  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end

  TopicPartitionList.from_native_tpl(tpl)
ensure
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
end

#pause(list) ⇒ nil

Pause producing or consumption for the provided list of partitions

Parameters:

Returns:

  • (nil)

Raises:



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/rdkafka/consumer.rb', line 97

def pause(list)
  closed_consumer_check(__method__)

  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_pause_partitions(inner, tpl)
    end

    if response != 0
      list = TopicPartitionList.from_native_tpl(tpl)
      raise Rdkafka::RdkafkaTopicPartitionListError.new(response, list, "Error pausing '#{list.to_h}'")
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#poll(timeout_ms) ⇒ Message?

Poll for the next message on one of the subscribed topics

Parameters:

  • timeout_ms (Integer)

    Timeout of this poll

Returns:

  • (Message, nil)

    A message or nil if there was no new message within the timeout

Raises:



517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
# File 'lib/rdkafka/consumer.rb', line 517

def poll(timeout_ms)
  closed_consumer_check(__method__)

  message_ptr = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_consumer_poll(inner, timeout_ms)
  end
  if message_ptr.null?
    nil
  else
    # Create struct wrapper
    native_message = Rdkafka::Bindings::Message.new(message_ptr)
    # Raise error if needed
    if native_message[:err] != 0
      raise Rdkafka::RdkafkaError.new(native_message[:err])
    end
    # Create a message to pass out
    Rdkafka::Consumer::Message.new(native_message)
  end
ensure
  # Clean up rdkafka message if there is one
  if message_ptr && !message_ptr.null?
    Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr)
  end
end

#position(list = nil) ⇒ TopicPartitionList

Return the current positions (offsets) for topics and partitions. The offset field of each requested partition will be set to the offset of the last consumed message + 1, or nil in case there was no previous message.

Parameters:

  • list (TopicPartitionList, nil) (defaults to: nil)

    The topic with partitions to get the offsets for or nil to use the current subscription.

Returns:

Raises:



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/rdkafka/consumer.rb', line 274

def position(list=nil)
  if list.nil?
    list = assignment
  elsif !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_position(inner, tpl)
  end

  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end

  TopicPartitionList.from_native_tpl(tpl)
end

#query_watermark_offsets(topic, partition, timeout_ms = 1000) ⇒ Integer

Query broker for low (oldest/beginning) and high (newest/end) offsets for a partition.

Parameters:

  • topic (String)

    The topic to query

  • partition (Integer)

    The partition to query

  • timeout_ms (Integer) (defaults to: 1000)

    The timeout for querying the broker

Returns:

  • (Integer)

    The low and high watermark

Raises:



301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/rdkafka/consumer.rb', line 301

def query_watermark_offsets(topic, partition, timeout_ms=1000)
  closed_consumer_check(__method__)

  low = FFI::MemoryPointer.new(:int64, 1)
  high = FFI::MemoryPointer.new(:int64, 1)

  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_query_watermark_offsets(
      inner,
      topic,
      partition,
      low,
      high,
      timeout_ms,
    )
  end
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error querying watermark offsets for partition #{partition} of #{topic}")
  end

  return low.read_array_of_int64(1).first, high.read_array_of_int64(1).first
ensure
  low.free   unless low.nil?
  high.free  unless high.nil?
end

#resume(list) ⇒ nil

Resumes producing consumption for the provided list of partitions

Parameters:

Returns:

  • (nil)

Raises:



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/rdkafka/consumer.rb', line 125

def resume(list)
  closed_consumer_check(__method__)

  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_resume_partitions(inner, tpl)
    end
    if response != 0
      raise Rdkafka::RdkafkaError.new(response, "Error resume '#{list.to_h}'")
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#seek(message) ⇒ nil

Seek to a particular message. The next poll on the topic/partition will return the message at the given offset.

Parameters:

Returns:

  • (nil)

Raises:



418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/rdkafka/consumer.rb', line 418

def seek(message)
  closed_consumer_check(__method__)

  # rd_kafka_offset_store is one of the few calls that does not support
  # a string as the topic, so create a native topic for it.
  native_topic = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_topic_new(
      inner,
      message.topic,
      nil
    )
  end
  response = Rdkafka::Bindings.rd_kafka_seek(
    native_topic,
    message.partition,
    message.offset,
    0 # timeout
  )
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
ensure
  if native_topic && !native_topic.null?
    Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic)
  end
end

#store_offset(message) ⇒ nil

Store offset of a message to be used in the next commit of this consumer

When using this enable.auto.offset.store should be set to false in the config.

Parameters:

Returns:

  • (nil)

Raises:



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/rdkafka/consumer.rb', line 387

def store_offset(message)
  closed_consumer_check(__method__)

  list = TopicPartitionList.new
  list.add_topic_and_partitions_with_offsets(
    message.topic,
    message.partition => message.offset + 1
  )

  tpl = list.to_native_tpl

  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_offsets_store(
      inner,
      tpl
    )
  end

  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
ensure
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
end

#subscribe(*topics) ⇒ nil

Subscribes to one or more topics letting Kafka handle partition assignments.

Parameters:

  • topics (Array<String>)

    One or more topic names

Returns:

  • (nil)

Raises:



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/rdkafka/consumer.rb', line 56

def subscribe(*topics)
  closed_consumer_check(__method__)

  # Create topic partition list with topics and no partition set
  tpl = Rdkafka::Bindings.rd_kafka_topic_partition_list_new(topics.length)

  topics.each do |topic|
    Rdkafka::Bindings.rd_kafka_topic_partition_list_add(tpl, topic, -1)
  end

  # Subscribe to topic partition list and check this was successful
  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_subscribe(inner, tpl)
  end
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(', ')}'")
  end
ensure
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) unless tpl.nil?
end

#subscriptionTopicPartitionList

Returns the current subscription to topics and partitions

Returns:

Raises:



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/rdkafka/consumer.rb', line 150

def subscription
  closed_consumer_check(__method__)

  ptr = FFI::MemoryPointer.new(:pointer)
  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_subscription(inner, ptr)
  end

  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end

  native = ptr.read_pointer

  begin
    Rdkafka::Consumer::TopicPartitionList.from_native_tpl(native)
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(native)
  end
end

#unsubscribenil

Unsubscribe from all subscribed topics.

Returns:

  • (nil)

Raises:



81
82
83
84
85
86
87
88
89
90
# File 'lib/rdkafka/consumer.rb', line 81

def unsubscribe
  closed_consumer_check(__method__)

  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_unsubscribe(inner)
  end
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
end