Class: WaterDrop::Producer::Variant

Inherits:
SimpleDelegator
  • Object
show all
Defined in:
lib/waterdrop/producer/variant.rb

Overview

Note:

Not all settings are alterable. We only allow to alter things that are safe to be altered as they have no impact on the producer. If there is a setting you consider important and want to make it alterable, please open a GH issue for evaluation.

Note:

Please be aware, that variant changes also affect buffers. If you overwrite the max_wait_timeout, since buffers are shared (as they exist on producer level), flushing may be impacted.

Note:

topic_config is validated when created for the first time during message production. This means, that configuration error may be raised only during dispatch. There is no way out of this, since we need librdkafka instance to create the references.

Object that acts as a proxy allowing for alteration of certain low-level per-topic configuration and some other settings that users may find useful to alter, without having to create new producers with their underlying librdkafka instances.

Since each librdkafka instance creates at least one TCP connection per broker, creating separate objects just to alter thing like acks may not be efficient and may lead to extensive usage of TCP connections, especially in bigger clusters.

This variant object allows for “wrapping” of the producer with alteration of those settings in such a way, that two or more alterations can co-exist and share the same producer, effectively sharing the librdkafka client.

Since this is an enhanced SimpleDelegator all WaterDrop::Producer APIs are preserved and a variant alteration can be used as a regular producer. The only important thing is to remember to only close it once.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(producer, max_wait_timeout: producer.config.max_wait_timeout, topic_config: EMPTY_HASH, default: false) ⇒ Variant

Returns a new instance of Variant.

Parameters:

  • producer (WaterDrop::Producer)

    producer for which we want to have a variant

  • max_wait_timeout (Integer, nil) (defaults to: producer.config.max_wait_timeout)

    alteration to max wait timeout or nil to use default

  • topic_config (Hash) (defaults to: EMPTY_HASH)

    extra topic configuration that can be altered.

  • default (Boolean) (defaults to: false)

    is this a default variant or an altered one

See Also:



47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/waterdrop/producer/variant.rb', line 47

def initialize(
  producer,
  max_wait_timeout: producer.config.max_wait_timeout,
  topic_config: EMPTY_HASH,
  default: false
)
  @producer = producer
  @max_wait_timeout = max_wait_timeout
  @topic_config = topic_config
  @default = default
  super(producer)

  Contracts::Variant.new.validate!(to_h, Errors::VariantInvalidError)
end

Instance Attribute Details

#max_wait_timeoutObject (readonly)

Returns the value of attribute max_wait_timeout.



39
40
41
# File 'lib/waterdrop/producer/variant.rb', line 39

def max_wait_timeout
  @max_wait_timeout
end

#producerObject (readonly)

Returns the value of attribute producer.



39
40
41
# File 'lib/waterdrop/producer/variant.rb', line 39

def producer
  @producer
end

#topic_configObject (readonly)

Returns the value of attribute topic_config.



39
40
41
# File 'lib/waterdrop/producer/variant.rb', line 39

def topic_config
  @topic_config
end

Instance Method Details

#default?Boolean

Returns is this a default variant for this producer.

Returns:

  • (Boolean)

    is this a default variant for this producer



63
64
65
# File 'lib/waterdrop/producer/variant.rb', line 63

def default?
  @default
end