Class: Karafka::Web::Pro::Commanding::Handlers::Partitions::Tracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb

Overview

Tracker used to record incoming partition related operational requests until they are executable or invalid. It stores the requests as they come for execution pre-polling.

Instance Method Summary collapse

Constructor Details

#initializeTracker

Returns a new instance of Tracker.

[View source]

22
23
24
25
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 22

def initialize
  @mutex = Mutex.new
  @requests = Hash.new { |h, k| h[k] = [] }
end

Instance Method Details

#<<(command) ⇒ Object

Note:

We accumulate requests per subscription group because this is the layer of applicability of those even for partition related requests.

Adds the given command into the tracker so it can be retrieved when needed.

Parameters:

  • command (Request)

    command we want to schedule

[View source]

32
33
34
35
36
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 32

def <<(command)
  @mutex.synchronize do
    @requests[command[:subscription_group_id]] << command
  end
end

#each_for(subscription_group_id, &block) {|given| ... } ⇒ Object

Selects all incoming command requests for given subscription group and iterates over them. It removes selected requests during iteration.

Parameters:

  • subscription_group_id (String)

    id of the subscription group for which we want to get all the requests. Subscription groups ids (not names) are unique within the application, so it is unique “enough”.

  • block (Proc)

Yield Parameters:

  • given (Request)

    command request for the requested subscription group

[View source]

47
48
49
50
51
52
53
54
55
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 47

def each_for(subscription_group_id, &block)
  requests = nil

  @mutex.synchronize do
    requests = @requests.delete(subscription_group_id)
  end

  (requests || EMPTY_ARRAY).each(&block)
end