Class: Karafka::Processing::TimedQueue
- Inherits:
-
Object
- Object
- Karafka::Processing::TimedQueue
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/processing/timed_queue.rb
Overview
Minimal queue with timeout for Ruby 3.1 and lower.
It is needed because only since 3.2, Ruby has a timeout on #pop
Instance Method Summary collapse
-
#close ⇒ Object
Closes the internal queue and releases the lock.
-
#initialize ⇒ TimedQueue
constructor
A new instance of TimedQueue.
-
#pop(timeout: 10_000_000_000) ⇒ Object
No timeout means waiting up to 31 years.
-
#push(obj) ⇒ Object
(also: #<<)
Adds element to the queue.
Constructor Details
#initialize ⇒ TimedQueue
Returns a new instance of TimedQueue.
11 12 13 14 15 |
# File 'lib/karafka/processing/timed_queue.rb', line 11 def initialize @queue = Queue.new @mutex = Thread::Mutex.new @resource = Thread::ConditionVariable.new end |
Instance Method Details
#close ⇒ Object
Closes the internal queue and releases the lock
54 55 56 57 58 59 |
# File 'lib/karafka/processing/timed_queue.rb', line 54 def close @mutex.synchronize do @queue.close @resource.broadcast end end |
#pop(timeout: 10_000_000_000) ⇒ Object
Note:
We use timeout in seconds because this is how Ruby 3.2+ works and we want to have the same API for newer and older Ruby versions
No timeout means waiting up to 31 years
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/karafka/processing/timed_queue.rb', line 36 def pop(timeout: 10_000_000_000) deadline = monotonic_now + timeout * 1000 @mutex.synchronize do loop do return @queue.pop unless @queue.empty? return @queue.pop if @queue.closed? to_wait = (deadline - monotonic_now) / 1_000.0 return nil if to_wait <= 0 @resource.wait(@mutex, to_wait) end end end |
#push(obj) ⇒ Object Also known as: <<
Adds element to the queue
20 21 22 23 24 25 |
# File 'lib/karafka/processing/timed_queue.rb', line 20 def push(obj) @mutex.synchronize do @queue << obj @resource.broadcast end end |