Class: Karafka::Pro::Iterator::TplBuilder

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/iterator/tpl_builder.rb

Overview

Because we have various formats in which we can provide the offsets, before we can subscribe to them, there needs to be a bit of normalization.

For some of the cases, we need to go to Kafka and get the real offsets or watermarks.

This builder resolves that and builds a tpl to which we can safely subscribe the way we want it.

Instance Method Summary collapse

Constructor Details

#initialize(consumer, expanded_topics) ⇒ TplBuilder

Returns a new instance of TplBuilder.

Parameters:

  • consumer (::Rdkafka::Consumer)

    consumer instance needed to talk with Kafka

  • expanded_topics (Hash)

    hash with expanded and normalized topics data



27
28
29
30
31
# File 'lib/karafka/pro/iterator/tpl_builder.rb', line 27

def initialize(consumer, expanded_topics)
  @consumer = ::Karafka::Connection::Proxy.new(consumer)
  @expanded_topics = expanded_topics
  @mapped_topics = Hash.new { |h, k| h[k] = {} }
end

Instance Method Details

#callRdkafka::Consumer::TopicPartitionList

Returns final tpl we can use to subscribe.

Returns:

  • (Rdkafka::Consumer::TopicPartitionList)

    final tpl we can use to subscribe



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/karafka/pro/iterator/tpl_builder.rb', line 34

def call
  resolve_partitions_without_offsets
  resolve_partitions_with_exact_offsets
  resolve_partitions_with_negative_offsets
  resolve_partitions_with_time_offsets
  resolve_partitions_with_cg_expectations

  # Final tpl with all the data
  tpl = Rdkafka::Consumer::TopicPartitionList.new

  @mapped_topics.each do |name, partitions|
    tpl.add_topic_and_partitions_with_offsets(name, partitions)
  end

  tpl
end