Class: Karafka::Routing::Builder

Inherits:
Array
  • Object
show all
Defined in:
lib/karafka/routing/builder.rb

Overview

Note:

We lock the access just in case this is used in patterns. The locks here do not have any impact on routing usage unless being expanded, so no race conditions risks.

Builder used as a DSL layer for building consumers and telling them which topics to consume

Examples:

Build a simple (most common) route

consumers do
  topic :new_videos do
    consumer NewVideosConsumer
  end
end

Instance Method Summary collapse

Constructor Details

#initializeBuilder

Returns a new instance of Builder.



26
27
28
29
30
31
# File 'lib/karafka/routing/builder.rb', line 26

def initialize
  @mutex = Mutex.new
  @draws = []
  @defaults = EMPTY_DEFAULTS
  super
end

Instance Method Details

#activeArray<Karafka::Routing::ConsumerGroup>

Returns only active consumer groups that we want to use. Since Karafka supports multi-process setup, we need to be able to pick only those consumer groups that should be active in our given process context.

Returns:

  • (Array<Karafka::Routing::ConsumerGroup>)

    only active consumer groups that we want to use. Since Karafka supports multi-process setup, we need to be able to pick only those consumer groups that should be active in our given process context



96
97
98
# File 'lib/karafka/routing/builder.rb', line 96

def active
  select(&:active?)
end

#clearObject

Clears the builder and the draws memory



101
102
103
104
105
106
107
# File 'lib/karafka/routing/builder.rb', line 101

def clear
  @mutex.synchronize do
    @defaults = EMPTY_DEFAULTS
    @draws.clear
    array_clear
  end
end

#defaults(&block) ⇒ Proc

Returns defaults that should be evaluated per topic.

Parameters:

  • block (Proc)

    block with per-topic evaluated defaults

Returns:

  • (Proc)

    defaults that should be evaluated per topic



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/karafka/routing/builder.rb', line 111

def defaults(&block)
  return @defaults unless block

  if @mutex.owned?
    @defaults = block
  else
    @mutex.synchronize do
      @defaults = block
    end
  end
end

#draw(&block) { ... } ⇒ Object

Note:

After it is done drawing it will store and validate all the routes to make sure that they are correct and that there are no topic/group duplications (this is forbidden)

Used to draw routes for Karafka

Examples:

draw do
  topic :xyz do
  end
end

Parameters:

  • block (Proc)

    block we will evaluate within the builder context

Yields:

  • Evaluates provided block in a builder context so we can describe routes

Raises:



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/karafka/routing/builder.rb', line 45

def draw(&block)
  @mutex.synchronize do
    @draws << block

    instance_eval(&block)

    # Ensures high-level routing details consistency
    # Contains checks that require knowledge about all the consumer groups to operate
    Contracts::Routing.new.validate!(
      map(&:to_h),
      scope: %w[routes]
    )

    each do |consumer_group|
      # Validate consumer group settings
      Contracts::ConsumerGroup.new.validate!(
        consumer_group.to_h,
        scope: ['routes', consumer_group.name]
      )

      # and then its topics settings
      consumer_group.topics.each do |topic|
        Contracts::Topic.new.validate!(
          topic.to_h,
          scope: ['routes', consumer_group.name, topic.name]
        )
      end

      # Initialize subscription groups after all the routing is done
      consumer_group.subscription_groups
    end
  end
end

#redraw(&block) ⇒ Object

Clear routes and draw them again with the given block. Helpful for testing purposes.

Parameters:

  • block (Proc)

    block we will evaluate within the builder context



85
86
87
88
89
90
91
# File 'lib/karafka/routing/builder.rb', line 85

def redraw(&block)
  @mutex.synchronize do
    @draws.clear
    array_clear
  end
  draw(&block)
end