Class: Karafka::Connection::MessagesBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/messages_buffer.rb

Overview

Note:

This buffer is NOT thread safe. We do not worry about it as we do not use it outside of the main listener loop. It can be cleared after the jobs are scheduled with messages it stores, because messages arrays are not “cleared” in any way directly and their reference stays.

Buffer used to build and store karafka messages built based on raw librdkafka messages.

Why do we have two buffers? RawMessagesBuffer is used to store raw messages and to handle cases related to partition revocation and reconnections. It is “internal” to the listening process. MessagesBuffer on the other hand is used to “translate” those raw messages that we know that are ok into Karafka messages and to simplify further work with them.

While it adds a bit of overhead, it makes conceptual things much easier and it adds only two simple hash iterations over messages batch.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription_group) ⇒ MessagesBuffer

Returns a new instance of MessagesBuffer.

Parameters:



23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/karafka/connection/messages_buffer.rb', line 23

def initialize(subscription_group)
  @subscription_group = subscription_group
  @size = 0

  @groups = Hash.new do |topic_groups, topic|
    topic_groups[topic] = Hash.new do |partition_groups, partition|
      partition_groups[partition] = {
        eof: false,
        messages: []
      }
    end
  end
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



20
21
22
# File 'lib/karafka/connection/messages_buffer.rb', line 20

def size
  @size
end

Instance Method Details

#each {|topic, partition, messages, true, last| ... } ⇒ Object

Allows to iterate over all the topics and partitions messages

Yield Parameters:

  • topic (String)

    name

  • partition (Integer)

    number

  • messages (Array<Karafka::Messages::Message>)

    from a given topic partition

  • true (Boolean)

    if eof, false otherwise

  • last (Float)

    polled at monotonic clock time



75
76
77
78
79
80
81
# File 'lib/karafka/connection/messages_buffer.rb', line 75

def each
  @groups.each do |topic, partitions|
    partitions.each do |partition, details|
      yield(topic, partition, details[:messages], details[:eof], details[:last_polled_at])
    end
  end
end

#empty?Boolean

Returns is the buffer empty or does it contain any messages.

Returns:

  • (Boolean)

    is the buffer empty or does it contain any messages



96
97
98
# File 'lib/karafka/connection/messages_buffer.rb', line 96

def empty?
  @size.zero?
end

#present?(topic, partition) ⇒ Boolean

Checks if there are any messages from a given topic partition in the buffer

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number

Returns:

  • (Boolean)

    true if there is at least one message from this topic partition, otherwise false



88
89
90
91
92
93
# File 'lib/karafka/connection/messages_buffer.rb', line 88

def present?(topic, partition)
  return false unless @groups.include?(topic)
  return false unless @groups[topic].include?(partition)

  true
end

#remap(raw_messages_buffer) ⇒ Object

Remaps raw messages from the raw messages buffer to Karafka messages

Parameters:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/karafka/connection/messages_buffer.rb', line 39

def remap(raw_messages_buffer)
  clear

  # Since it happens "right after" we've received the messages, it is close enough it time
  # to be used as the moment we received messages.
  received_at = Time.now
  last_polled_at = raw_messages_buffer.last_polled_at

  raw_messages_buffer.each do |topic, partition, messages, eof|
    @size += messages.count

    ktopic = @subscription_group.topics.find(topic)

    built_messages = messages.map do |message|
      Messages::Builders::Message.call(
        message,
        ktopic,
        received_at
      )
    end

    @groups[topic][partition] = {
      eof: eof,
      messages: built_messages,
      last_polled_at: last_polled_at
    }
  end
end