Class: Karafka::Pro::Processing::JobsQueue

Inherits:
Karafka::Processing::JobsQueue show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/pro/processing/jobs_queue.rb

Overview

Enhanced processing queue that provides ability to build complex work-distribution schedulers dedicated to particular job types

Aside from the OSS queue capabilities it allows for jobless locking for advanced schedulers

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Karafka::Processing::JobsQueue

#<<, #close, #complete, #pop, #statistics, #tick

Constructor Details

#initializeKarafka::Pro::Processing::JobsQueue



24
25
26
27
28
29
30
31
32
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 24

def initialize
  super

  @in_waiting = Hash.new { |h, k| h[k] = [] }
  @locks = Hash.new { |h, k| h[k] = {} }
  @async_locking = false

  @statistics[:waiting] = 0
end

Instance Attribute Details

#in_processingObject

Returns the value of attribute in_processing.



16
17
18
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 16

def in_processing
  @in_processing
end

Instance Method Details

#clear(group_id) ⇒ Object

Clears the processing states for a provided group. Useful when a recovery happens and we need to clean up state but only for a given subscription group.

Parameters:

  • group_id (String)


124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 124

def clear(group_id)
  @mutex.synchronize do
    @in_processing[group_id].clear

    @statistics[:waiting] -= @in_waiting[group_id].size
    @in_waiting[group_id].clear
    @locks[group_id].clear
    @async_locking = false

    # We unlock it just in case it was blocked when clearing started
    tick(group_id)
  end
end

#empty?(group_id) ⇒ Boolean

a given group.

Parameters:

  • group_id (String)

Returns:

  • (Boolean)

    tell us if we have anything in the processing (or for processing) from



142
143
144
145
146
147
148
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 142

def empty?(group_id)
  @mutex.synchronize do
    @in_processing[group_id].empty? &&
      @in_waiting[group_id].empty? &&
      !locked_async?(group_id)
  end
end

#lock(job) ⇒ Object

Method that allows us to lock queue on a given subscription group without enqueuing the a job. This can be used when building complex schedulers that want to postpone enqueuing before certain conditions are met.

Parameters:

  • job (Jobs::Base)

    job used for locking



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 49

def lock(job)
  @mutex.synchronize do
    group = @in_waiting[job.group_id]

    # This should never happen. Same job should not be locked twice
    raise(Errors::JobsQueueSynchronizationError, job.group_id) if group.include?(job)

    @statistics[:waiting] += 1

    group << job
  end
end

#lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) ⇒ Object

Note:

We do not raise Errors::JobsQueueSynchronizationError similar to #lock here because we want to have ability to prolong time limited locks

Allows for explicit locking of the queue of a given subscription group.

This can be used for cross-topic synchronization.

Parameters:

  • group_id (String)

    id of the group we want to lock

  • lock_id (Object)

    unique id we want to use to identify our lock

  • timeout (Integer) (defaults to: WAIT_TIMEOUT)

    number of ms how long this lock should be valid. Useful for auto-expiring locks used to delay further processing without explicit pausing on the consumer



90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 90

def lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT)
  return if @queue.closed?

  @async_locking = true

  @mutex.synchronize do
    @locks[group_id][lock_id] = monotonic_now + timeout

    # We need to tick so our new time sensitive lock can reload time constraints on sleep
    tick(group_id)
  end
end

#register(group_id) ⇒ Object

Registers semaphore and a lock hash

Parameters:

  • group_id (String)


37
38
39
40
41
42
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 37

def register(group_id)
  super
  @mutex.synchronize do
    @locks[group_id]
  end
end

#unlock(job) ⇒ Object

Method for unlocking the given subscription group queue space that was locked with a given job that was not added to the queue but used via #lock.

Parameters:

  • job (Jobs::Base)

    job that locked the queue



66
67
68
69
70
71
72
73
74
75
76
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 66

def unlock(job)
  @mutex.synchronize do
    @statistics[:waiting] -= 1

    return if @in_waiting[job.group_id].delete(job)

    # This should never happen. It means there was a job being unlocked that was never
    # locked in the first place
    raise(Errors::JobsQueueSynchronizationError, job.group_id)
  end
end

#unlock_async(group_id, lock_id) ⇒ Object

Allows for explicit unlocking of locked queue of a group

Parameters:

  • group_id (String)

    id of the group we want to unlock

  • lock_id (Object)

    unique id we want to use to identify our lock



108
109
110
111
112
113
114
115
116
117
118
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 108

def unlock_async(group_id, lock_id)
  @mutex.synchronize do
    if @locks[group_id].delete(lock_id)
      tick(group_id)

      return
    end

    raise(Errors::JobsQueueSynchronizationError, [group_id, lock_id])
  end
end

#wait(group_id) ⇒ Object

Note:

Because checking that async locking is on happens on regular ticking, first lock on a group can take up to one tick. That is expected.

Note:

This implementation takes into consideration temporary async locks that can happen. Thanks to the fact that we use the minimum lock time as a timeout, we do not have to wait a whole ticking period to unlock async locks.

Blocks when there are things in the queue in a given group and waits until all the blocking jobs from a given group are completed or any of the locks times out

Parameters:

  • group_id (String)

    id of the group in which jobs we’re interested.

See Also:

  • Karafka::Pro::Processing::JobsQueue.`Karafka`Karafka::Processing`Karafka::Processing::JobsQueue`


161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 161

def wait(group_id)
  return super unless @async_locking

  # We do not generalize this flow because this one is more expensive as it has to allocate
  # extra objects. That's why we only use it when locks are actually in use
  base_interval = tick_interval / 1_000.0

  while wait?(group_id)
    yield if block_given?

    now = monotonic_now

    wait_times = @locks[group_id].values.map! do |lock_time|
      # Convert ms to seconds, seconds are required by Ruby queue engine
      (lock_time - now) / 1_000
    end

    wait_times.delete_if(&:negative?)
    wait_times << base_interval

    @semaphores.fetch(group_id).pop(timeout: wait_times.min)
  end
end