Class: Karafka::Pro::Iterator
- Inherits:
-
Object
- Object
- Karafka::Pro::Iterator
- Defined in:
- lib/karafka/pro/iterator.rb,
lib/karafka/pro/iterator/expander.rb,
lib/karafka/pro/iterator/tpl_builder.rb
Overview
Topic iterator allows you to iterate over topic/partition data and perform lookups for information that you need.
It supports early stops on finding the requested data and allows for seeking till the end. It also allows for signaling, when a given message should be last out of certain partition, but we still want to continue iterating in other messages.
It does not create a consumer group and does not have any offset management until first consumer offset marking happens. So can be use for quick seeks as well as iterative, repetitive data fetching from rake, etc.
Defined Under Namespace
Classes: Expander, TplBuilder
Instance Method Summary collapse
-
#each ⇒ Object
Iterates over requested topic partitions and yields the results with the iterator itself Iterator instance is yielded because one can run
stop_partition
to stop iterating over part of data. -
#initialize(topics, settings: { 'auto.offset.reset': 'beginning' }, yield_nil: false, max_wait_time: 200) ⇒ Iterator
constructor
A simple API allowing to iterate over topic/partition data, without having to subscribe and deal with rebalances.
-
#mark_as_consumed(message) ⇒ Object
Marks given message as consumed.
-
#mark_as_consumed!(message) ⇒ Object
Marks given message as consumed and commits offsets.
-
#stop ⇒ Object
Stops all the iterating.
-
#stop_current_partition ⇒ Object
Stops the partition we’re currently yielded into.
-
#stop_partition(name, partition) ⇒ Object
Stops processing of a given partition We expect the partition to be provided because of a scenario, where there is a multi-partition iteration and we want to stop a different partition that the one that is currently yielded.
Constructor Details
#initialize(topics, settings: { 'auto.offset.reset': 'beginning' }, yield_nil: false, max_wait_time: 200) ⇒ Iterator
It is worth keeping in mind, that this API also needs to operate within max.poll.interval.ms
limitations on each iteration
In case of a never-ending iterator, you need to set enable.partition.eof
to false
so we don’t stop polling data even when reaching the end (end on a given moment)
A simple API allowing to iterate over topic/partition data, without having to subscribe and deal with rebalances. This API allows for multi-partition streaming and is optimized for data lookups. It allows for explicit stopping iteration over any partition during the iteration process, allowing for optimized lookups.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/karafka/pro/iterator.rb', line 38 def initialize( topics, settings: { 'auto.offset.reset': 'beginning' }, yield_nil: false, max_wait_time: 200 ) @topics_with_partitions = Expander.new.call(topics) @routing_topics = @topics_with_partitions.map do |name, _| [name, ::Karafka::Routing::Router.find_or_initialize_by_name(name)] end.to_h @total_partitions = @topics_with_partitions.map(&:last).sum(&:count) @stopped_partitions = 0 @settings = settings @yield_nil = yield_nil @max_wait_time = max_wait_time end |
Instance Method Details
#each ⇒ Object
Iterates over requested topic partitions and yields the results with the iterator itself Iterator instance is yielded because one can run stop_partition
to stop iterating over part of data. It is useful for scenarios where we are looking for some information in all the partitions but once we found it, given partition data is no longer needed and would only eat up resources.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/karafka/pro/iterator.rb', line 64 def each Admin.with_consumer(@settings) do |consumer| tpl = TplBuilder.new(consumer, @topics_with_partitions).call consumer.assign(tpl) # We need this for self-referenced APIs like pausing @current_consumer = consumer # Stream data until we reach the end of all the partitions or until the end user # indicates that they are done until done? = poll # Skip nils if not explicitly required next if .nil? && !@yield_nil if @current_message = () yield(@current_message, self) else yield(nil, self) end end @current_consumer.commit_offsets(async: false) if @stored_offsets @current_message = nil @current_consumer = nil end # Reset so we can use the same iterator again if needed @stopped_partitions = 0 end |
#mark_as_consumed(message) ⇒ Object
Marks given message as consumed.
135 136 137 138 |
# File 'lib/karafka/pro/iterator.rb', line 135 def mark_as_consumed() @current_consumer.store_offset(, nil) @stored_offsets = true end |
#mark_as_consumed!(message) ⇒ Object
Marks given message as consumed and commits offsets
143 144 145 146 |
# File 'lib/karafka/pro/iterator.rb', line 143 def mark_as_consumed!() mark_as_consumed() @current_consumer.commit_offsets(async: false) end |
#stop ⇒ Object
break
can also be used but in such cases commits stored async will not be flushed to Kafka. This is why #stop
is the recommended method.
Stops all the iterating
128 129 130 |
# File 'lib/karafka/pro/iterator.rb', line 128 def stop @stopped = true end |
#stop_current_partition ⇒ Object
Stops the partition we’re currently yielded into
99 100 101 102 103 104 |
# File 'lib/karafka/pro/iterator.rb', line 99 def stop_current_partition stop_partition( @current_message.topic, @current_message.partition ) end |
#stop_partition(name, partition) ⇒ Object
Stops processing of a given partition We expect the partition to be provided because of a scenario, where there is a multi-partition iteration and we want to stop a different partition that the one that is currently yielded.
We pause it forever and no longer work with it.
115 116 117 118 119 120 121 122 123 |
# File 'lib/karafka/pro/iterator.rb', line 115 def stop_partition(name, partition) @stopped_partitions += 1 @current_consumer.pause( Rdkafka::Consumer::TopicPartitionList.new( name => [Rdkafka::Consumer::Partition.new(partition, 0)] ) ) end |