Class: WaterDrop::Clients::Buffered

Inherits:
Dummy
  • Object
show all
Defined in:
lib/waterdrop/clients/buffered.rb

Overview

Client used to buffer messages that we send out in specs and other places.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Dummy

#method_missing, #respond_to_missing?

Constructor Details

#initialize(*args) ⇒ Buffered

Returns a new instance of Buffered.

Parameters:

  • args (Object)

    anything accepted by Clients::Dummy



10
11
12
13
14
15
16
17
18
19
# File 'lib/waterdrop/clients/buffered.rb', line 10

def initialize(*args)
  super
  @messages = []
  @topics = Hash.new { |k, v| k[v] = [] }

  @transaction_active = false
  @transaction_messages = []
  @transaction_topics = Hash.new { |k, v| k[v] = [] }
  @transaction_level = 0
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class WaterDrop::Clients::Dummy

Instance Attribute Details

#messagesObject

Returns the value of attribute messages.



7
8
9
# File 'lib/waterdrop/clients/buffered.rb', line 7

def messages
  @messages
end

Instance Method Details

#abort_transactionObject

Aborts the transaction



70
71
72
73
74
75
# File 'lib/waterdrop/clients/buffered.rb', line 70

def abort_transaction
  @transaction_level -= 1
  @transaction_topics.clear
  @transaction_messages.clear
  @transaction_active = false
end

#begin_transactionObject

Starts the transaction on a given level



38
39
40
41
# File 'lib/waterdrop/clients/buffered.rb', line 38

def begin_transaction
  @transaction_level += 1
  @transaction_active = true
end

#commit_transactionObject

Finishes given level of transaction



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/waterdrop/clients/buffered.rb', line 44

def commit_transaction
  @transaction_level -= 1

  # Transfer transactional data on success
  @transaction_topics.each do |topic, messages|
    @topics[topic] += messages
  end

  @messages += @transaction_messages

  @transaction_topics.clear
  @transaction_messages.clear
  @transaction_active = false
end

#messages_for(topic) ⇒ Object

Returns messages produced to a given topic

Parameters:

  • topic (String)


79
80
81
# File 'lib/waterdrop/clients/buffered.rb', line 79

def messages_for(topic)
  @topics[topic]
end

#produce(message) ⇒ Dummy::Handle

“Produces” message to Kafka: it acknowledges it locally, adds it to the internal buffer

Parameters:

  • message (Hash)

    WaterDrop::Producer#produce_sync message hash

Returns:

  • (Dummy::Handle)

    fake delivery handle that can be materialized into a report



24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/waterdrop/clients/buffered.rb', line 24

def produce(message)
  if @transaction_active
    @transaction_topics[message.fetch(:topic)] << message
    @transaction_messages << message
  else
    # We pre-validate the message payload, so topic is ensured to be present
    @topics[message.fetch(:topic)] << message
    @messages << message
  end

  super(**message.to_h)
end

#resetObject

Clears internal buffer Used in between specs so messages do not leak out



85
86
87
88
89
90
91
92
# File 'lib/waterdrop/clients/buffered.rb', line 85

def reset
  @transaction_level = 0
  @transaction_active = false
  @transaction_topics.clear
  @transaction_messages.clear
  @messages.clear
  @topics.each_value(&:clear)
end

#send_offsets_to_transaction(_consumer, _tpl, _timeout) ⇒ Object

Fakes storing the offset in a transactional fashion

Parameters:

  • _consumer (#consumer_group_metadata_pointer)

    any consumer from which we can obtain the librdkafka consumer group metadata pointer

  • _tpl (Rdkafka::Consumer::TopicPartitionList)

    consumer tpl for offset storage

  • _timeout (Integer)

    ms timeout



65
66
67
# File 'lib/waterdrop/clients/buffered.rb', line 65

def send_offsets_to_transaction(_consumer, _tpl, _timeout)
  nil
end