Class: WaterDrop::Producer
- Inherits:
-
Object
- Object
- WaterDrop::Producer
- Extended by:
- Forwardable
- Includes:
- Karafka::Core::Helpers::Time, Async, Buffer, Sync, Transactions
- Defined in:
- lib/waterdrop/producer.rb,
lib/waterdrop/producer/sync.rb,
lib/waterdrop/producer/async.rb,
lib/waterdrop/producer/buffer.rb,
lib/waterdrop/producer/status.rb,
lib/waterdrop/producer/builder.rb,
lib/waterdrop/producer/variant.rb,
lib/waterdrop/producer/transactions.rb
Overview
Main WaterDrop messages producer
Defined Under Namespace
Modules: Async, Buffer, Sync, Transactions Classes: Builder, Status, Variant
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Dry-configurable config object.
-
#id ⇒ String
readonly
Uuid of the current producer.
-
#messages ⇒ Array
readonly
Internal messages buffer.
-
#monitor ⇒ Object
readonly
Monitor we want to use.
-
#status ⇒ Status
readonly
Producer status object.
Instance Method Summary collapse
-
#client ⇒ Rdkafka::Producer
Raw rdkafka producer.
-
#close(force: false) ⇒ Object
Flushes the buffers in a sync way and closes the producer.
-
#close! ⇒ Object
Closes the producer with forced close after timeout, purging any outgoing data.
-
#idempotent? ⇒ Boolean
True if current producer is idempotent.
-
#initialize(&block) ⇒ Producer
constructor
Creates a not-yet-configured instance of the producer.
-
#middleware ⇒ WaterDrop::Producer::Middleware
Returns and caches the middleware object that may be used.
-
#partition_count(topic) ⇒ Integer
Fetches and caches the partition count of a topic.
-
#purge ⇒ Object
Purges data from both the buffer queue as well as the librdkafka queue.
-
#setup(&block) ⇒ Object
Sets up the whole configuration and initializes all that is needed.
-
#with(**args) ⇒ WaterDrop::Producer::Variant
(also: #variant)
Builds the variant alteration and returns it.
Methods included from Transactions
#transaction, #transaction?, #transaction_mark_as_consumed, #transactional?
Methods included from Buffer
#buffer, #buffer_many, #flush_async, #flush_sync
Methods included from Async
#produce_async, #produce_many_async
Methods included from Sync
#produce_many_sync, #produce_sync
Constructor Details
#initialize(&block) ⇒ Producer
Creates a not-yet-configured instance of the producer
46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/waterdrop/producer.rb', line 46 def initialize(&block) @operations_in_progress = Helpers::Counter.new @buffer_mutex = Mutex.new @connecting_mutex = Mutex.new @operating_mutex = Mutex.new @transaction_mutex = Mutex.new @status = Status.new @messages = [] return unless block setup(&block) end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns dry-configurable config object.
41 42 43 |
# File 'lib/waterdrop/producer.rb', line 41 def config @config end |
#id ⇒ String (readonly)
Returns uuid of the current producer.
33 34 35 |
# File 'lib/waterdrop/producer.rb', line 33 def id @id end |
#messages ⇒ Array (readonly)
Returns internal messages buffer.
37 38 39 |
# File 'lib/waterdrop/producer.rb', line 37 def @messages end |
#monitor ⇒ Object (readonly)
Returns monitor we want to use.
39 40 41 |
# File 'lib/waterdrop/producer.rb', line 39 def monitor @monitor end |
#status ⇒ Status (readonly)
Returns producer status object.
35 36 37 |
# File 'lib/waterdrop/producer.rb', line 35 def status @status end |
Instance Method Details
#client ⇒ Rdkafka::Producer
Client is lazy initialized, keeping in mind also the fact of a potential fork that can happen any time.
It is not recommended to fork a producer that is already in use so in case of bootstrapping a cluster, it’s much better to fork configured but not used producers
Returns raw rdkafka producer.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/waterdrop/producer.rb', line 83 def client return @client if @client && @pid == Process.pid # Don't allow to obtain a client reference for a producer that was not configured raise Errors::ProducerNotConfiguredError, id if @status.initial? @connecting_mutex.synchronize do return @client if @client && @pid == Process.pid # We undefine all the finalizers, in case it was a fork, so the finalizers from the parent # process don't leak ObjectSpace.undefine_finalizer(id) # We should raise an error when trying to use a producer with client from a fork. Always. if @client # We need to reset the client, otherwise there might be attempt to close the parent # client @client = nil raise Errors::ProducerUsedInParentProcess, Process.pid end # Finalizer tracking is needed for handling shutdowns gracefully. # I don't expect everyone to remember about closing all the producers all the time, thus # this approach is better. Although it is still worth keeping in mind, that this will # block GC from removing a no longer used producer unless closed properly but at least # won't crash the VM upon closing the process ObjectSpace.define_finalizer(id, proc { close }) @pid = Process.pid @client = Builder.new.call(self, @config) @status.connected! @monitor.instrument('producer.connected', producer_id: id) end @client end |
#close(force: false) ⇒ Object
Flushes the buffers in a sync way and closes the producer
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/waterdrop/producer.rb', line 182 def close(force: false) @operating_mutex.synchronize do return unless @status.active? @monitor.instrument( 'producer.closed', producer_id: id ) do @status.closing! @monitor.instrument('producer.closing', producer_id: id) # No need for auto-gc if everything got closed by us # This should be used only in case a producer was not closed properly and forgotten ObjectSpace.undefine_finalizer(id) # We save this thread id because we need to bypass the activity verification on the # producer for final flush of buffers. @closing_thread_id = Thread.current.object_id # Wait until all the outgoing operations are done. Only when no one is using the # underlying client running operations we can close sleep(0.001) until @operations_in_progress.value.zero? # Flush has its own buffer mutex but even if it is blocked, flushing can still happen # as we close the client after the flushing (even if blocked by the mutex) flush(true) # We should not close the client in several threads the same time # It is safe to run it several times but not exactly the same moment # We also mark it as closed only if it was connected, if not, it would trigger a new # connection that anyhow would be immediately closed if @client # Why do we trigger it early instead of just having `#close` do it? # The linger.ms time will be ignored for the duration of the call, # queued messages will be sent to the broker as soon as possible. begin @client.flush(current_variant.max_wait_timeout) unless @client.closed? # We can safely ignore timeouts here because any left outstanding requests # will anyhow force wait on close if not forced. # If forced, we will purge the queue and just close rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError nil ensure # Purge fully the local queue in case of a forceful shutdown just to be sure, that # there are no dangling messages. In case flush was successful, there should be # none but we do it just in case it timed out purge if force end @client.close @client = nil end # Remove callbacks runners that were registered ::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id) ::Karafka::Core::Instrumentation.error_callbacks.delete(@id) ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(@id) @status.closed! end end end |
#close! ⇒ Object
Closes the producer with forced close after timeout, purging any outgoing data
247 248 249 |
# File 'lib/waterdrop/producer.rb', line 247 def close! close(force: true) end |
#idempotent? ⇒ Boolean
Returns true if current producer is idempotent.
165 166 167 168 169 170 171 |
# File 'lib/waterdrop/producer.rb', line 165 def idempotent? # Every transactional producer is idempotent by default always return true if transactional? return @idempotent if instance_variable_defined?(:'@idempotent') @idempotent = config.kafka.to_h.key?(:'enable.idempotence') end |
#middleware ⇒ WaterDrop::Producer::Middleware
Returns and caches the middleware object that may be used
175 176 177 |
# File 'lib/waterdrop/producer.rb', line 175 def middleware @middleware ||= config.middleware end |
#partition_count(topic) ⇒ Integer
It uses the underlying rdkafka-ruby
partition count fetch and cache.
Fetches and caches the partition count of a topic
128 129 130 |
# File 'lib/waterdrop/producer.rb', line 128 def partition_count(topic) client.partition_count(topic.to_s) end |
#purge ⇒ Object
This is an operation that can cause data loss. Keep that in mind. It will not only purge the internal WaterDrop buffer but will also purge the librdkafka queue as well as will cancel any outgoing messages dispatches.
Purges data from both the buffer queue as well as the librdkafka queue.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/waterdrop/producer.rb', line 137 def purge @monitor.instrument('buffer.purged', producer_id: id) do @buffer_mutex.synchronize do @messages = [] end # We should not purge if there is no client initialized # It may not be initialized if we created a new producer that never connected to kafka, # we used buffer and purged. In cases like this client won't exist @connecting_mutex.synchronize do @client&.purge end end end |
#setup(&block) ⇒ Object
Sets up the whole configuration and initializes all that is needed
63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/waterdrop/producer.rb', line 63 def setup(&block) raise Errors::ProducerAlreadyConfiguredError, id unless @status.initial? @config = Config .new .setup(&block) .config @id = @config.id @monitor = @config.monitor @contract = Contracts::Message.new(max_payload_size: @config.max_payload_size) @default_variant = Variant.new(self, default: true) @status.configured! end |
#with(**args) ⇒ WaterDrop::Producer::Variant Also known as: variant
Builds the variant alteration and returns it.
156 157 158 159 160 |
# File 'lib/waterdrop/producer.rb', line 156 def with(**args) ensure_active! Variant.new(self, **args) end |