Module: Karafka::Pro::Routing::Features::VirtualPartitions::Topic
- Defined in:
- lib/karafka/pro/routing/features/virtual_partitions/topic.rb
Overview
Topic extensions to be able to manage virtual partitions feature
Instance Method Summary collapse
-
#to_h ⇒ Hash
Topic with all its native configuration options plus manual offset management namespace settings.
-
#virtual_partitions(max_partitions: Karafka::App.config.concurrency, partitioner: nil, offset_metadata_strategy: :current, reducer: nil) ⇒ VirtualPartitions
Method that allows to set the virtual partitions details during the routing configuration and then allows to retrieve it.
-
#virtual_partitions? ⇒ Boolean
Are virtual partitions enabled for given topic.
Instance Method Details
#to_h ⇒ Hash
Returns topic with all its native configuration options plus manual offset management namespace settings.
49 50 51 52 53 |
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 49 def to_h super.merge( virtual_partitions: virtual_partitions.to_h ).freeze end |
#virtual_partitions(max_partitions: Karafka::App.config.concurrency, partitioner: nil, offset_metadata_strategy: :current, reducer: nil) ⇒ VirtualPartitions
Returns method that allows to set the virtual partitions details during the routing configuration and then allows to retrieve it.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 25 def virtual_partitions( max_partitions: Karafka::App.config.concurrency, partitioner: nil, offset_metadata_strategy: :current, reducer: nil ) @virtual_partitions ||= Config.new( active: !partitioner.nil?, max_partitions: max_partitions, partitioner: partitioner, offset_metadata_strategy: , # If no reducer provided, we use this one. It just runs a modulo on the sum of # a stringified version, providing fairly good distribution. reducer: reducer || ->(virtual_key) { virtual_key.to_s.sum % max_partitions } ) end |
#virtual_partitions? ⇒ Boolean
Returns are virtual partitions enabled for given topic.
43 44 45 |
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 43 def virtual_partitions? virtual_partitions.active? end |