Class: Karafka::Web::Pro::Commanding::Handlers::Partitions::Tracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb

Overview

Tracker used to record incoming partition related operational requests until they are executable or invalid. It stores the requests as they come for execution pre-polling.

Instance Method Summary collapse

Constructor Details

#initializeTracker

Returns a new instance of Tracker.



22
23
24
25
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 22

def initialize
  @mutex = Mutex.new
  @requests = Hash.new { |h, k| h[k] = [] }
end

Instance Method Details

#<<(command) ⇒ Object

Note:

We accumulate requests per subscription group because this is the layer of applicability of those even for partition related requests.

Adds the given command into the tracker so it can be retrieved when needed.

Parameters:

  • command (Request)

    command we want to schedule



32
33
34
35
36
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 32

def <<(command)
  @mutex.synchronize do
    @requests[command[:subscription_group_id]] << command
  end
end

#each_for(subscription_group_id, &block) {|given| ... } ⇒ Object

Selects all incoming command requests for given subscription group and iterates over them. It removes selected requests during iteration.

Parameters:

  • subscription_group_id (String)

    id of the subscription group for which we want to get all the requests. Subscription groups ids (not names) are unique within the application, so it is unique “enough”.

  • block (Proc)

Yield Parameters:

  • given (Request)

    command request for the requested subscription group



47
48
49
50
51
52
53
54
55
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 47

def each_for(subscription_group_id, &block)
  requests = nil

  @mutex.synchronize do
    requests = @requests.delete(subscription_group_id)
  end

  (requests || EMPTY_ARRAY).each(&block)
end