Class: Karafka::Routing::Builder

Inherits:
Array
  • Object
show all
Defined in:
lib/karafka/routing/builder.rb

Overview

Note:

We lock the access just in case this is used in patterns. The locks here do not have any impact on routing usage unless being expanded, so no race conditions risks.

Builder used as a DSL layer for building consumers and telling them which topics to consume

Examples:

Build a simple (most common) route

consumers do
  topic :new_videos do
    consumer NewVideosConsumer
  end
end

Instance Method Summary collapse

Constructor Details

#initializeBuilder

Returns a new instance of Builder.



26
27
28
29
30
31
# File 'lib/karafka/routing/builder.rb', line 26

def initialize
  @mutex = Mutex.new
  @draws = []
  @defaults = EMPTY_DEFAULTS
  super
end

Instance Method Details

#activeArray<Karafka::Routing::ConsumerGroup>

Returns only active consumer groups that we want to use. Since Karafka supports multi-process setup, we need to be able to pick only those consumer groups that should be active in our given process context.

Returns:

  • (Array<Karafka::Routing::ConsumerGroup>)

    only active consumer groups that we want to use. Since Karafka supports multi-process setup, we need to be able to pick only those consumer groups that should be active in our given process context



73
74
75
# File 'lib/karafka/routing/builder.rb', line 73

def active
  select(&:active?)
end

#clearObject

Clears the builder and the draws memory



78
79
80
81
82
83
84
# File 'lib/karafka/routing/builder.rb', line 78

def clear
  @mutex.synchronize do
    @defaults = EMPTY_DEFAULTS
    @draws.clear
    super
  end
end

#defaults(&block) ⇒ Proc

Returns defaults that should be evaluated per topic.

Parameters:

  • block (Proc)

    block with per-topic evaluated defaults

Returns:

  • (Proc)

    defaults that should be evaluated per topic



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/karafka/routing/builder.rb', line 88

def defaults(&block)
  return @defaults unless block

  if @mutex.owned?
    @defaults = block
  else
    @mutex.synchronize do
      @defaults = block
    end
  end
end

#draw(&block) { ... } ⇒ Object

Note:

After it is done drawing it will store and validate all the routes to make sure that they are correct and that there are no topic/group duplications (this is forbidden)

Used to draw routes for Karafka

Examples:

draw do
  topic :xyz do
  end
end

Parameters:

  • block (Proc)

    block we will evaluate within the builder context

Yields:

  • Evaluates provided block in a builder context so we can describe routes

Raises:



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/karafka/routing/builder.rb', line 45

def draw(&block)
  @mutex.synchronize do
    @draws << block

    instance_eval(&block)

    # Ensures high-level routing details consistency
    # Contains checks that require knowledge about all the consumer groups to operate
    Contracts::Routing.new.validate!(map(&:to_h))

    each do |consumer_group|
      # Validate consumer group settings
      Contracts::ConsumerGroup.new.validate!(consumer_group.to_h)

      # and then its topics settings
      consumer_group.topics.each do |topic|
        Contracts::Topic.new.validate!(topic.to_h)
      end

      # Initialize subscription groups after all the routing is done
      consumer_group.subscription_groups
    end
  end
end