Class: WaterDrop::Producer::Variant
- Inherits:
-
SimpleDelegator
- Object
- SimpleDelegator
- WaterDrop::Producer::Variant
- Defined in:
- lib/waterdrop/producer/variant.rb
Overview
Not all settings are alterable. We only allow to alter things that are safe to be altered as they have no impact on the producer. If there is a setting you consider important and want to make it alterable, please open a GH issue for evaluation.
Please be aware, that variant changes also affect buffers. If you overwrite the max_wait_timeout
, since buffers are shared (as they exist on producer level), flushing may be impacted.
topic_config
is validated when created for the first time during message production. This means, that configuration error may be raised only during dispatch. There is no way out of this, since we need librdkafka
instance to create the references.
Object that acts as a proxy allowing for alteration of certain low-level per-topic configuration and some other settings that users may find useful to alter, without having to create new producers with their underlying librdkafka instances.
Since each librdkafka instance creates at least one TCP connection per broker, creating separate objects just to alter thing like acks
may not be efficient and may lead to extensive usage of TCP connections, especially in bigger clusters.
This variant object allows for “wrapping” of the producer with alteration of those settings in such a way, that two or more alterations can co-exist and share the same producer, effectively sharing the librdkafka client.
Since this is an enhanced SimpleDelegator
all WaterDrop::Producer
APIs are preserved and a variant alteration can be used as a regular producer. The only important thing is to remember to only close it once.
Instance Attribute Summary collapse
-
#max_wait_timeout ⇒ Object
readonly
Returns the value of attribute max_wait_timeout.
-
#producer ⇒ Object
readonly
Returns the value of attribute producer.
-
#topic_config ⇒ Object
readonly
Returns the value of attribute topic_config.
Instance Method Summary collapse
-
#default? ⇒ Boolean
Is this a default variant for this producer.
-
#initialize(producer, max_wait_timeout: producer.config.max_wait_timeout, topic_config: EMPTY_HASH, default: false) ⇒ Variant
constructor
A new instance of Variant.
Constructor Details
#initialize(producer, max_wait_timeout: producer.config.max_wait_timeout, topic_config: EMPTY_HASH, default: false) ⇒ Variant
Returns a new instance of Variant.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/waterdrop/producer/variant.rb', line 47 def initialize( producer, max_wait_timeout: producer.config.max_wait_timeout, topic_config: EMPTY_HASH, default: false ) @producer = producer @max_wait_timeout = max_wait_timeout @topic_config = topic_config @default = default super(producer) Contracts::Variant.new.validate!(to_h, Errors::VariantInvalidError) end |
Instance Attribute Details
#max_wait_timeout ⇒ Object (readonly)
Returns the value of attribute max_wait_timeout.
39 40 41 |
# File 'lib/waterdrop/producer/variant.rb', line 39 def max_wait_timeout @max_wait_timeout end |
#producer ⇒ Object (readonly)
Returns the value of attribute producer.
39 40 41 |
# File 'lib/waterdrop/producer/variant.rb', line 39 def producer @producer end |
#topic_config ⇒ Object (readonly)
Returns the value of attribute topic_config.
39 40 41 |
# File 'lib/waterdrop/producer/variant.rb', line 39 def topic_config @topic_config end |
Instance Method Details
#default? ⇒ Boolean
Returns is this a default variant for this producer.
63 64 65 |
# File 'lib/waterdrop/producer/variant.rb', line 63 def default? @default end |