Module: Karafka::Testing::Minitest::Helpers

Defined in:
lib/karafka/testing/minitest/helpers.rb

Overview

Minitest helpers module that needs to be included

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object

Adds all the needed extra functionalities to the minitest group

Parameters:

  • base (Class)

    Minitest example group we want to extend



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/karafka/testing/minitest/helpers.rb', line 19

def included(base)
  eval_flow = lambda do
    @karafka = Karafka::Testing::Minitest::Proxy.new(self)
    @_karafka_consumer_messages = []
    @_karafka_consumer_client = Karafka::Testing::SpecConsumerClient.new
    @_karafka_producer_client = Karafka::Testing::SpecProducerClient.new(self)

    @_karafka_consumer_messages.clear
    @_karafka_producer_client.reset

    Karafka.producer.stubs(:client).returns(@_karafka_producer_client)
  end

  if base.to_s == 'Minitest::Spec'
    base.class_eval do
      before(&eval_flow)
    end
  else
    base.class_eval do
      setup(&eval_flow)
    end
  end
end

Instance Method Details

#_karafka_add_message_to_consumer_if_needed(message) ⇒ Object

Adds a new Karafka message instance if needed with given payload and options into an internal consumer buffer that will be used to simulate messages delivery to the consumer

Examples:

Send a json message to consumer

@karafka.produce({ 'hello' => 'world' }.to_json)

Send a json message to consumer and simulate, that it is partition 6

@karafka.produce({ 'hello' => 'world' }.to_json, 'partition' => 6)

Parameters:

  • message (Hash)

    message that was sent to Kafka



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/karafka/testing/minitest/helpers.rb', line 78

def _karafka_add_message_to_consumer_if_needed(message)
  # Consumer needs to be defined in order to pass messages to it
  return unless defined?(@consumer)
  # We're interested in adding message to consumer only when it is a Karafka consumer
  # Users may want to test other things (models producing messages for example) and in
  # their case consumer will not be a consumer
  return unless @consumer.is_a?(Karafka::BaseConsumer)
  # We target to the consumer only messages that were produced to it, since specs may also
  # produce other messages targeting other topics
  return unless message[:topic] == @consumer.topic.name

  # Build message metadata and copy any metadata that would come from the message
   = 

  .keys.each do |key|
    next unless message.key?(key)

    [key] = message.fetch(key)
  end
  # Add this message to previously produced messages
  @_karafka_consumer_messages << Karafka::Messages::Message.new(
    message[:payload],
    Karafka::Messages::Metadata.new().freeze
  )
  # Update batch metadata
   = Karafka::Messages::Builders::BatchMetadata.call(
    @_karafka_consumer_messages,
    @consumer.topic,
    0,
    Time.now
  )

  # Update consumer messages batch
  @consumer.messages = Karafka::Messages::Messages.new(
    @_karafka_consumer_messages,
    
  )
end

#_karafka_consumer_for(requested_topic, requested_consumer_group = nil) ⇒ Object

Creates a consumer instance for a given topic

Examples:

Creates a consumer instance with settings for my_requested_topic

consumer = @karafka.consumer_for(:my_requested_topic)

Parameters:

  • requested_topic (String, Symbol)

    name of the topic for which we want to create a consumer instance

  • requested_consumer_group (String, Symbol, nil) (defaults to: nil)

    optional name of the consumer group if we have multiple consumer groups listening on the same topic

Returns:

  • (Object)

    Karafka consumer instance

Raises:



56
57
58
59
60
61
62
63
64
65
66
# File 'lib/karafka/testing/minitest/helpers.rb', line 56

def _karafka_consumer_for(requested_topic, requested_consumer_group = nil)
  selected_topics = Testing::Helpers.karafka_consumer_find_candidate_topics(
    requested_topic.to_s,
    requested_consumer_group.to_s
  )

  raise Errors::TopicInManyConsumerGroupsError, requested_topic if selected_topics.size > 1
  raise Errors::TopicNotFoundError, requested_topic if selected_topics.empty?

  _karafka_build_consumer_for(selected_topics.first)
end

#_karafka_produce(payload, metadata = {}) ⇒ Object

Produces message with a given payload to the consumer matching topic

Parameters:

  • payload (String)

    payload we want to dispatch

  • metadata (Hash) (defaults to: {})

    any metadata we want to dispatch alongside the payload



120
121
122
123
124
125
126
127
# File 'lib/karafka/testing/minitest/helpers.rb', line 120

def _karafka_produce(payload,  = {})
  Karafka.producer.produce_sync(
    {
      topic: @consumer.topic.name,
      payload: payload
    }.merge()
  )
end

#_karafka_produced_messagesArray<Hash>

Returns messages that were produced.

Returns:

  • (Array<Hash>)

    messages that were produced



130
131
132
# File 'lib/karafka/testing/minitest/helpers.rb', line 130

def _karafka_produced_messages
  @_karafka_producer_client.messages
end