Class: Karafka::Pro::Iterator::Expander

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/iterator/expander.rb

Overview

There are various ways you can provide topics information for iterating.

This mapper normalizes this data, resolves offsets and maps the time based offsets into appropriate once

Following formats are accepted:

  • ‘topic1’ - just a string with one topic name

  • [‘topic1’, ‘topic2’] - just the names

  • { ‘topic1’ => -100 } - names with negative lookup offset

  • { ‘topic1’ => { 0 => 5 } } - names with exact partitions offsets

  • { ‘topic1’ => { 0 => -5 }, ‘topic2’ => { 1 => 5 } } - with per partition negative offsets

  • { ‘topic1’ => 100 } - means we run all partitions from the offset 100

  • { ‘topic1’ => Time.now - 60 } - we run all partitions from the message from 60s ago

  • { ‘topic1’ => { 1 => Time.now - 60 } } - partition1 from message 60s ago

  • { ‘topic1’ => { 1 => true } } - will pick first offset on this CG for partition 1

  • { ‘topic1’ => true } - will pick first offset for all partitions

  • { ‘topic1’ => :earliest } - will pick earliest offset for all partitions

  • { ‘topic1’ => :latest } - will pick latest (high-watermark) for all partitions

Instance Method Summary collapse

Instance Method Details

#call(topics) ⇒ Hash

Expands topics to which we want to subscribe with partitions information in case this info is not provided.

Parameters:

  • topics (Array, Hash, String)

    topics definitions

Returns:

  • (Hash)

    expanded and normalized requested topics and partitions data

[View source]

34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/karafka/pro/iterator/expander.rb', line 34

def call(topics)
  expanded = Hash.new { |h, k| h[k] = {} }

  normalize_format(topics).map do |topic, details|
    if details.is_a?(Hash)
      details.each do |partition, offset|
        expanded[topic][partition] = offset
      end
    else
      partition_count(topic).times do |partition|
        # If no offsets are provided, we just start from zero
        expanded[topic][partition] = details || 0
      end
    end
  end

  expanded
end