Class: Karafka::Pro::Processing::JobsQueue
- Inherits:
-
Karafka::Processing::JobsQueue
- Object
- Karafka::Processing::JobsQueue
- Karafka::Pro::Processing::JobsQueue
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/pro/processing/jobs_queue.rb
Overview
Enhanced processing queue that provides ability to build complex work-distribution schedulers dedicated to particular job types
Aside from the OSS queue capabilities it allows for jobless locking for advanced schedulers
Instance Attribute Summary collapse
-
#in_processing ⇒ Object
Returns the value of attribute in_processing.
Instance Method Summary collapse
-
#clear(group_id) ⇒ Object
Clears the processing states for a provided group.
-
#empty?(group_id) ⇒ Boolean
a given group.
- #initialize ⇒ Karafka::Pro::Processing::JobsQueue constructor
-
#lock(job) ⇒ Object
Method that allows us to lock queue on a given subscription group without enqueuing the a job.
-
#lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) ⇒ Object
Allows for explicit locking of the queue of a given subscription group.
-
#register(group_id) ⇒ Object
Registers semaphore and a lock hash.
-
#unlock(job) ⇒ Object
Method for unlocking the given subscription group queue space that was locked with a given job that was not added to the queue but used via
#lock
. -
#unlock_async(group_id, lock_id) ⇒ Object
Allows for explicit unlocking of locked queue of a group.
-
#wait(group_id) ⇒ Object
Blocks when there are things in the queue in a given group and waits until all the blocking jobs from a given group are completed or any of the locks times out.
Methods inherited from Karafka::Processing::JobsQueue
#<<, #close, #complete, #pop, #statistics, #tick
Constructor Details
#initialize ⇒ Karafka::Pro::Processing::JobsQueue
24 25 26 27 28 29 30 31 32 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 24 def initialize super @in_waiting = Hash.new { |h, k| h[k] = [] } @locks = Hash.new { |h, k| h[k] = {} } @async_locking = false @statistics[:waiting] = 0 end |
Instance Attribute Details
#in_processing ⇒ Object
Returns the value of attribute in_processing.
16 17 18 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 16 def in_processing @in_processing end |
Instance Method Details
#clear(group_id) ⇒ Object
Clears the processing states for a provided group. Useful when a recovery happens and we need to clean up state but only for a given subscription group.
124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 124 def clear(group_id) @mutex.synchronize do @in_processing[group_id].clear @statistics[:waiting] -= @in_waiting[group_id].size @in_waiting[group_id].clear @locks[group_id].clear @async_locking = false # We unlock it just in case it was blocked when clearing started tick(group_id) end end |
#empty?(group_id) ⇒ Boolean
a given group.
142 143 144 145 146 147 148 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 142 def empty?(group_id) @mutex.synchronize do @in_processing[group_id].empty? && @in_waiting[group_id].empty? && !locked_async?(group_id) end end |
#lock(job) ⇒ Object
Method that allows us to lock queue on a given subscription group without enqueuing the a job. This can be used when building complex schedulers that want to postpone enqueuing before certain conditions are met.
49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 49 def lock(job) @mutex.synchronize do group = @in_waiting[job.group_id] # This should never happen. Same job should not be locked twice raise(Errors::JobsQueueSynchronizationError, job.group_id) if group.include?(job) @statistics[:waiting] += 1 group << job end end |
#lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) ⇒ Object
We do not raise Errors::JobsQueueSynchronizationError
similar to #lock
here because we want to have ability to prolong time limited locks
Allows for explicit locking of the queue of a given subscription group.
This can be used for cross-topic synchronization.
90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 90 def lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) return if @queue.closed? @async_locking = true @mutex.synchronize do @locks[group_id][lock_id] = monotonic_now + timeout # We need to tick so our new time sensitive lock can reload time constraints on sleep tick(group_id) end end |
#register(group_id) ⇒ Object
Registers semaphore and a lock hash
37 38 39 40 41 42 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 37 def register(group_id) super @mutex.synchronize do @locks[group_id] end end |
#unlock(job) ⇒ Object
Method for unlocking the given subscription group queue space that was locked with a given job that was not added to the queue but used via #lock
.
66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 66 def unlock(job) @mutex.synchronize do @statistics[:waiting] -= 1 return if @in_waiting[job.group_id].delete(job) # This should never happen. It means there was a job being unlocked that was never # locked in the first place raise(Errors::JobsQueueSynchronizationError, job.group_id) end end |
#unlock_async(group_id, lock_id) ⇒ Object
Allows for explicit unlocking of locked queue of a group
108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 108 def unlock_async(group_id, lock_id) @mutex.synchronize do if @locks[group_id].delete(lock_id) tick(group_id) return end raise(Errors::JobsQueueSynchronizationError, [group_id, lock_id]) end end |
#wait(group_id) ⇒ Object
Because checking that async locking is on happens on regular ticking, first lock on a group can take up to one tick. That is expected.
This implementation takes into consideration temporary async locks that can happen. Thanks to the fact that we use the minimum lock time as a timeout, we do not have to wait a whole ticking period to unlock async locks.
Blocks when there are things in the queue in a given group and waits until all the blocking jobs from a given group are completed or any of the locks times out
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 161 def wait(group_id) return super unless @async_locking # We do not generalize this flow because this one is more expensive as it has to allocate # extra objects. That's why we only use it when locks are actually in use base_interval = tick_interval / 1_000.0 while wait?(group_id) yield if block_given? now = monotonic_now wait_times = @locks[group_id].values.map! do |lock_time| # Convert ms to seconds, seconds are required by Ruby queue engine (lock_time - now) / 1_000 end wait_times.delete_if(&:negative?) wait_times << base_interval @semaphores.fetch(group_id).pop(timeout: wait_times.min) end end |