Class: Karafka::Pro::Routing::Features::Swarm::Contracts::Routing

Inherits:
Contracts::Base
  • Object
show all
Defined in:
lib/karafka/pro/routing/features/swarm/contracts/routing.rb

Overview

Special contract that validates prior to starting swarm that each node has at least one assignment.

It is special because we cannot run it during routing definitions, because we can only run it when all routes are defined and full context is available.

This is why we use it before warmup when everything is expected to be configured.

Instance Method Summary collapse

Instance Method Details

#validate!(builder) ⇒ Object

Validates that each node has at least one assignment.

Parameters:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/karafka/pro/routing/features/swarm/contracts/routing.rb', line 31

def validate!(builder)
  nodes_setup = Hash.new do |h, node_id|
    h[node_id] = { active: false, node_id: node_id }
  end

  # Initialize nodes in the hash so we can iterate over them
  App.config.swarm.nodes.times { |node_id| nodes_setup[node_id] }
  nodes_setup.freeze

  builder.each do |consumer_group|
    consumer_group.topics.each do |topic|
      nodes_setup.each do |node_id, details|
        next unless topic.active?
        next unless topic.swarm.nodes.include?(node_id)

        details[:active] = true
      end
    end
  end

  nodes_setup.each_value do |details|
    super(details)
  end
end