Module: Karafka::Messages::Builders::BatchMetadata

Defined in:
lib/karafka/messages/builders/batch_metadata.rb

Overview

Builder for creating batch metadata object based on the batch informations.

Class Method Summary collapse

Class Method Details

.call(messages, topic, partition, scheduled_at) ⇒ Karafka::Messages::BatchMetadata

Note:

We do not set processed_at as this needs to be assigned when the batch is picked up for processing.

Creates metadata based on the kafka batch data.

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    messages array

  • topic (Karafka::Routing::Topic)

    topic for which we’ve fetched the batch

  • partition (Integer)

    partition of this metadata

  • scheduled_at (Time)

    moment when the batch was scheduled for processing

Returns:



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/karafka/messages/builders/batch_metadata.rb', line 19

def call(messages, topic, partition, scheduled_at)
  Karafka::Messages::BatchMetadata.new(
    size: messages.count,
    first_offset: messages.first&.offset || -1001,
    last_offset: messages.last&.offset || -1001,
    deserializers: topic.deserializers,
    partition: partition,
    topic: topic.name,
    # We go with the assumption that the creation of the whole batch is the last message
    # creation time
    created_at: local_created_at(messages.last),
    # When this batch was built and scheduled for execution
    scheduled_at: scheduled_at,
    # This needs to be set to a correct value prior to processing starting
    processed_at: nil
  )
end