Class: Karafka::Pro::Iterator

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/iterator.rb,
lib/karafka/pro/iterator/expander.rb,
lib/karafka/pro/iterator/tpl_builder.rb

Overview

Topic iterator allows you to iterate over topic/partition data and perform lookups for information that you need.

It supports early stops on finding the requested data and allows for seeking till the end. It also allows for signaling, when a given message should be last out of certain partition, but we still want to continue iterating in other messages.

It does not create a consumer group and does not have any offset management until first consumer offset marking happens. So can be use for quick seeks as well as iterative, repetitive data fetching from rake, etc.

Defined Under Namespace

Classes: Expander, TplBuilder

Instance Method Summary collapse

Constructor Details

#initialize(topics, settings: { 'auto.offset.reset': 'beginning' }, yield_nil: false, max_wait_time: 200) ⇒ Iterator

Note:

It is worth keeping in mind, that this API also needs to operate within max.poll.interval.ms limitations on each iteration

Note:

In case of a never-ending iterator, you need to set enable.partition.eof to false so we don’t stop polling data even when reaching the end (end on a given moment)

A simple API allowing to iterate over topic/partition data, without having to subscribe and deal with rebalances. This API allows for multi-partition streaming and is optimized for data lookups. It allows for explicit stopping iteration over any partition during the iteration process, allowing for optimized lookups.

Parameters:

  • topics (Array<String>, Hash)

    list of strings if we want to subscribe to multiple topics and all of their partitions or a hash where keys are the topics and values are hashes with partitions and their initial offsets.

  • settings (Hash) (defaults to: { 'auto.offset.reset': 'beginning' })

    extra settings for the consumer. Please keep in mind, that if overwritten, you may want to include auto.offset.reset to match your case.

  • yield_nil (Boolean) (defaults to: false)

    should we yield also nil values when poll returns nothing. Useful in particular for long-living iterators.

  • max_wait_time (Integer) (defaults to: 200)

    max wait in ms when iterator did not receive any messages



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/karafka/pro/iterator.rb', line 38

def initialize(
  topics,
  settings: { 'auto.offset.reset': 'beginning' },
  yield_nil: false,
  max_wait_time: 200
)
  @topics_with_partitions = Expander.new.call(topics)

  @routing_topics = @topics_with_partitions.map do |name, _|
    [name, ::Karafka::Routing::Router.find_or_initialize_by_name(name)]
  end.to_h

  @total_partitions = @topics_with_partitions.map(&:last).sum(&:count)

  @stopped_partitions = 0

  @settings = settings
  @yield_nil = yield_nil
  @max_wait_time = max_wait_time
end

Instance Method Details

#eachObject

Iterates over requested topic partitions and yields the results with the iterator itself Iterator instance is yielded because one can run stop_partition to stop iterating over part of data. It is useful for scenarios where we are looking for some information in all the partitions but once we found it, given partition data is no longer needed and would only eat up resources.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/karafka/pro/iterator.rb', line 64

def each
  Admin.with_consumer(@settings) do |consumer|
    tpl = TplBuilder.new(consumer, @topics_with_partitions).call
    consumer.assign(tpl)

    # We need this for self-referenced APIs like pausing
    @current_consumer = consumer

    # Stream data until we reach the end of all the partitions or until the end user
    # indicates that they are done
    until done?
      message = poll

      # Skip nils if not explicitly required
      next if message.nil? && !@yield_nil

      if message
        @current_message = build_message(message)

        yield(@current_message, self)
      else
        yield(nil, self)
      end
    end

    @current_consumer.commit_offsets(async: false) if @stored_offsets
    @current_message = nil
    @current_consumer = nil
  end

  # Reset so we can use the same iterator again if needed
  @stopped_partitions = 0
end

#mark_as_consumed(message) ⇒ Object

Marks given message as consumed.

Parameters:



135
136
137
138
# File 'lib/karafka/pro/iterator.rb', line 135

def mark_as_consumed(message)
  @current_consumer.store_offset(message, nil)
  @stored_offsets = true
end

#mark_as_consumed!(message) ⇒ Object

Marks given message as consumed and commits offsets

Parameters:



143
144
145
146
# File 'lib/karafka/pro/iterator.rb', line 143

def mark_as_consumed!(message)
  mark_as_consumed(message)
  @current_consumer.commit_offsets(async: false)
end

#stopObject

Note:

break can also be used but in such cases commits stored async will not be flushed to Kafka. This is why #stop is the recommended method.

Stops all the iterating



128
129
130
# File 'lib/karafka/pro/iterator.rb', line 128

def stop
  @stopped = true
end

#stop_current_partitionObject

Stops the partition we’re currently yielded into



99
100
101
102
103
104
# File 'lib/karafka/pro/iterator.rb', line 99

def stop_current_partition
  stop_partition(
    @current_message.topic,
    @current_message.partition
  )
end

#stop_partition(name, partition) ⇒ Object

Stops processing of a given partition We expect the partition to be provided because of a scenario, where there is a multi-partition iteration and we want to stop a different partition that the one that is currently yielded.

We pause it forever and no longer work with it.

Parameters:

  • name (String)

    topic name of which partition we want to stop

  • partition (Integer)

    partition we want to stop processing



115
116
117
118
119
120
121
122
123
# File 'lib/karafka/pro/iterator.rb', line 115

def stop_partition(name, partition)
  @stopped_partitions += 1

  @current_consumer.pause(
    Rdkafka::Consumer::TopicPartitionList.new(
      name => [Rdkafka::Consumer::Partition.new(partition, 0)]
    )
  )
end