Module: Karafka::Testing::RSpec::Helpers

Defined in:
lib/karafka/testing/rspec/helpers.rb

Overview

RSpec helpers module that needs to be included

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object

Adds all the needed extra functionalities to the rspec group

Parameters:

  • base (Class)

    RSpec example group we want to extend



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/karafka/testing/rspec/helpers.rb', line 29

def included(base)
  # RSpec local reference to Karafka proxy that allows us to build the consumer instance
  base.let(:karafka) { Karafka::Testing::RSpec::Proxy.new(self) }

  # Messages that are targeted to the consumer
  # You can produce many messages from Karafka during specs and not all should go to the
  # consumer for processing. This buffer holds only those that should go to consumer
  base.let(:_karafka_consumer_messages) { [] }
  # Consumer fake client to mock communication with Kafka
  base.let(:_karafka_consumer_client) { Karafka::Testing::SpecConsumerClient.new }
  # Producer fake client to mock communication with Kafka
  base.let(:_karafka_producer_client) { Karafka::Testing::SpecProducerClient.new(self) }

  base.prepend_before do
    Karafka::Testing.ensure_karafka_initialized!

    _karafka_consumer_messages.clear
    _karafka_producer_client.reset
    @_karafka_consumer_mappings = {}

    # We do check the presence not only of Mocha but also that it is used and
    # that patches are available because some users have Mocha as part of their
    # supply chain, but do not use it when running Karafka specs. In such cases, without
    # such check `karafka-testing` would falsely assume, that Mocha is in use.
    if Object.const_defined?('Mocha', false) && Karafka.producer.respond_to?(:stubs)
      Karafka.producer.stubs(:client).returns(_karafka_producer_client)
    else
      allow(Karafka.producer).to receive(:client).and_return(_karafka_producer_client)
    end
  end
end

Instance Method Details

#_karafka_add_message_to_consumer_if_needed(message) ⇒ Object

Adds a new Karafka message instance if needed with given payload and options into an internal consumer buffer that will be used to simulate messages delivery to the consumer

Examples:

Send a json message to consumer

before do
  karafka.produce({ 'hello' => 'world' }.to_json)
end

Send a json message to consumer and simulate, that it is partition 6

before do
  karafka.produce({ 'hello' => 'world' }.to_json, 'partition' => 6)
end

Parameters:

  • message (Hash)

    message that was sent to Kafka



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/karafka/testing/rspec/helpers.rb', line 101

def _karafka_add_message_to_consumer_if_needed(message)
  consumer_obj = if defined?(consumer)
                   consumer
                 else
                   @_karafka_consumer_mappings&.dig(message[:topic])
                 end
  # Consumer needs to be defined in order to pass messages to it
  return unless consumer_obj
  # We're interested in adding message to consumer only when it is a Karafka consumer
  # Users may want to test other things (models producing messages for example) and in
  # their case consumer will not be a consumer
  return unless consumer_obj.is_a?(Karafka::BaseConsumer)
  # We target to the consumer only messages that were produced to it, since specs may also
  # produce other messages targeting other topics
  return unless message[:topic] == consumer_obj.topic.name

  # Build message metadata and copy any metadata that would come from the message
   = (consumer_obj)

  .keys.each do |key|
    message_key = METADATA_DISPATCH_MAPPINGS.fetch(key, key)

    next unless message.key?(message_key)

    [key] = message.fetch(message_key)
  end

  # Add this message to previously produced messages
  _karafka_consumer_messages << Karafka::Messages::Message.new(
    message[:payload],
    Karafka::Messages::Metadata.new()
  )

  # Update batch metadata
   = Karafka::Messages::Builders::BatchMetadata.call(
    _karafka_consumer_messages,
    consumer_obj.topic,
    0,
    Time.now
  )

  # Update consumer messages batch
  consumer_obj.messages = Karafka::Messages::Messages.new(
    _karafka_consumer_messages,
    
  )
end

#_karafka_consumer_for(requested_topic, requested_consumer_group = nil) ⇒ Object

Creates a consumer instance for a given topic

Examples:

Creates a MyConsumer consumer instance with settings for my_requested_topic

RSpec.describe MyConsumer do
  subject(:consumer) { karafka.consumer_for(:my_requested_topic) }
end

Parameters:

  • requested_topic (String, Symbol)

    name of the topic for which we want to create a consumer instance

  • requested_consumer_group (String, Symbol, nil) (defaults to: nil)

    optional name of the consumer group if we have multiple consumer groups listening on the same topic

Returns:

  • (Object)

    Karafka consumer instance

Raises:



76
77
78
79
80
81
82
83
84
85
86
# File 'lib/karafka/testing/rspec/helpers.rb', line 76

def _karafka_consumer_for(requested_topic, requested_consumer_group = nil)
  selected_topics = Testing::Helpers.karafka_consumer_find_candidate_topics(
    requested_topic.to_s,
    requested_consumer_group.to_s
  )

  raise Errors::TopicInManyConsumerGroupsError, requested_topic if selected_topics.size > 1
  raise Errors::TopicNotFoundError, requested_topic if selected_topics.empty?

  _karafka_build_consumer_for(selected_topics.first)
end

#_karafka_produce(payload, metadata = {}) ⇒ Object

Produces message with a given payload to the consumer matching topic

Parameters:

  • payload (String)

    payload we want to dispatch

  • metadata (Hash) (defaults to: {})

    any metadata we want to dispatch alongside the payload



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/karafka/testing/rspec/helpers.rb', line 152

def _karafka_produce(payload,  = {})
  topic = if [:topic]
            [:topic]
          elsif defined?(consumer)
            consumer.topic.name
          else
            @_karafka_consumer_mappings&.keys&.last
          end
  Karafka.producer.produce_sync(
    {
      topic: topic,
      payload: payload
    }.merge()
  )
end

#_karafka_produced_messagesArray<Hash>

Returns messages that were produced.

Returns:

  • (Array<Hash>)

    messages that were produced



169
170
171
# File 'lib/karafka/testing/rspec/helpers.rb', line 169

def _karafka_produced_messages
  _karafka_producer_client.messages
end