Class: WaterDrop::Clients::Buffered
- Defined in:
- lib/waterdrop/clients/buffered.rb
Overview
Client used to buffer messages that we send out in specs and other places.
Instance Attribute Summary collapse
-
#messages ⇒ Object
Returns the value of attribute messages.
Instance Method Summary collapse
-
#abort_transaction ⇒ Object
Aborts the transaction.
-
#begin_transaction ⇒ Object
Starts the transaction on a given level.
-
#commit_transaction ⇒ Object
Finishes given level of transaction.
-
#initialize(*args) ⇒ Buffered
constructor
A new instance of Buffered.
-
#messages_for(topic) ⇒ Object
Returns messages produced to a given topic.
-
#produce(message) ⇒ Dummy::Handle
“Produces” message to Kafka: it acknowledges it locally, adds it to the internal buffer.
-
#reset ⇒ Object
Clears internal buffer Used in between specs so messages do not leak out.
-
#send_offsets_to_transaction(_consumer, _tpl, _timeout) ⇒ Object
Fakes storing the offset in a transactional fashion.
Methods inherited from Dummy
#method_missing, #respond_to_missing?
Constructor Details
#initialize(*args) ⇒ Buffered
Returns a new instance of Buffered.
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/waterdrop/clients/buffered.rb', line 10 def initialize(*args) super @messages = [] @topics = Hash.new { |k, v| k[v] = [] } @transaction_active = false @transaction_messages = [] @transaction_topics = Hash.new { |k, v| k[v] = [] } @transaction_level = 0 end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method in the class WaterDrop::Clients::Dummy
Instance Attribute Details
#messages ⇒ Object
Returns the value of attribute messages.
7 8 9 |
# File 'lib/waterdrop/clients/buffered.rb', line 7 def @messages end |
Instance Method Details
#abort_transaction ⇒ Object
Aborts the transaction
70 71 72 73 74 75 |
# File 'lib/waterdrop/clients/buffered.rb', line 70 def abort_transaction @transaction_level -= 1 @transaction_topics.clear @transaction_messages.clear @transaction_active = false end |
#begin_transaction ⇒ Object
Starts the transaction on a given level
38 39 40 41 |
# File 'lib/waterdrop/clients/buffered.rb', line 38 def begin_transaction @transaction_level += 1 @transaction_active = true end |
#commit_transaction ⇒ Object
Finishes given level of transaction
44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/waterdrop/clients/buffered.rb', line 44 def commit_transaction @transaction_level -= 1 # Transfer transactional data on success @transaction_topics.each do |topic, | @topics[topic] += end @messages += @transaction_messages @transaction_topics.clear @transaction_messages.clear @transaction_active = false end |
#messages_for(topic) ⇒ Object
Returns messages produced to a given topic
79 80 81 |
# File 'lib/waterdrop/clients/buffered.rb', line 79 def (topic) @topics[topic] end |
#produce(message) ⇒ Dummy::Handle
“Produces” message to Kafka: it acknowledges it locally, adds it to the internal buffer
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/waterdrop/clients/buffered.rb', line 24 def produce() if @transaction_active @transaction_topics[.fetch(:topic)] << @transaction_messages << else # We pre-validate the message payload, so topic is ensured to be present @topics[.fetch(:topic)] << @messages << end super(**.to_h) end |
#reset ⇒ Object
Clears internal buffer Used in between specs so messages do not leak out
85 86 87 88 89 90 91 92 |
# File 'lib/waterdrop/clients/buffered.rb', line 85 def reset @transaction_level = 0 @transaction_active = false @transaction_topics.clear @transaction_messages.clear @messages.clear @topics.each_value(&:clear) end |
#send_offsets_to_transaction(_consumer, _tpl, _timeout) ⇒ Object
Fakes storing the offset in a transactional fashion
65 66 67 |
# File 'lib/waterdrop/clients/buffered.rb', line 65 def send_offsets_to_transaction(_consumer, _tpl, _timeout) nil end |