Class: Karafka::Web::Pro::Commanding::Handlers::Partitions::Tracker
- Inherits:
-
Object
- Object
- Karafka::Web::Pro::Commanding::Handlers::Partitions::Tracker
- Includes:
- Singleton
- Defined in:
- lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb
Overview
Tracker used to record incoming partition related operational requests until they are executable or invalid. It stores the requests as they come for execution pre-polling.
Instance Method Summary collapse
-
#<<(command) ⇒ Object
Adds the given command into the tracker so it can be retrieved when needed.
-
#each_for(subscription_group_id, &block) {|given| ... } ⇒ Object
Selects all incoming command requests for given subscription group and iterates over them.
-
#initialize ⇒ Tracker
constructor
A new instance of Tracker.
Constructor Details
#initialize ⇒ Tracker
Returns a new instance of Tracker.
22 23 24 25 |
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 22 def initialize @mutex = Mutex.new @requests = Hash.new { |h, k| h[k] = [] } end |
Instance Method Details
#<<(command) ⇒ Object
Note:
We accumulate requests per subscription group because this is the layer of applicability of those even for partition related requests.
Adds the given command into the tracker so it can be retrieved when needed.
32 33 34 35 36 |
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 32 def <<(command) @mutex.synchronize do @requests[command[:subscription_group_id]] << command end end |
#each_for(subscription_group_id, &block) {|given| ... } ⇒ Object
Selects all incoming command requests for given subscription group and iterates over them. It removes selected requests during iteration.
47 48 49 50 51 52 53 54 55 |
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 47 def each_for(subscription_group_id, &block) requests = nil @mutex.synchronize do requests = @requests.delete(subscription_group_id) end (requests || EMPTY_ARRAY).each(&block) end |