Class: Karafka::ActiveJob::Dispatcher
- Inherits:
-
Object
- Object
- Karafka::ActiveJob::Dispatcher
- Defined in:
- lib/karafka/active_job/dispatcher.rb
Overview
Dispatcher that sends the ActiveJob job to a proper topic based on the queue name
Direct Known Subclasses
Instance Method Summary collapse
-
#dispatch(job) ⇒ Object
-
#dispatch_at(job, timestamp) ⇒ Object
Raises info, that Karafka backend does not support scheduling jobs if someone wants to schedule jobs in the future.
-
#dispatch_many(jobs) ⇒ Object
Bulk dispatches multiple jobs using the Rails 7.1+ API.
Instance Method Details
#dispatch(job) ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/karafka/active_job/dispatcher.rb', line 21 def dispatch(job) Karafka.producer.public_send( fetch_option(job, :dispatch_method, DEFAULTS), topic: job.queue_name, payload: serialize_job(job) ) end |
#dispatch_at(job, timestamp) ⇒ Object
Karafka Pro supports future jobs via the Scheduled Messages feature
For ActiveJob Continuation to work without Pro, configure your continuable jobs: self.resume_options = { wait: 0 }
For #retry_on to work without Pro, configure with:
retry_on SomeError, wait: 0, jitter: 0
Raises info, that Karafka backend does not support scheduling jobs if someone wants to
schedule jobs in the future. It works for past and present because we want to support
things like continuation and #retry_on API with no wait and no jitter.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/karafka/active_job/dispatcher.rb', line 67 def dispatch_at(job, ) # Dispatch at is used by some of the ActiveJob features that actually do not back-off # but things go via this API nonetheless. if .to_f <= Time.now.to_f dispatch(job) else raise NotImplementedError, <<~ERROR_MESSAGE This queueing backend does not support scheduling future jobs. If you're using ActiveJob Continuation, configure your jobs with: self.resume_options = { wait: 0 } If you're using retry_on, configure with: retry_on SomeError, wait: 0, jitter: 0 For full support of delayed job execution, consider using Karafka Pro with Scheduled Messages. ERROR_MESSAGE end end |
#dispatch_many(jobs) ⇒ Object
Bulk dispatches multiple jobs using the Rails 7.1+ API
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/karafka/active_job/dispatcher.rb', line 31 def dispatch_many(jobs) # Group jobs by their desired dispatch method # It can be configured per job class, so we need to make sure we divide them dispatches = Hash.new { |hash, key| hash[key] = [] } jobs.each do |job| d_method = fetch_option(job, :dispatch_many_method, DEFAULTS) dispatches[d_method] << { topic: job.queue_name, payload: serialize_job(job) } end dispatches.each do |type, | Karafka.producer.public_send( type, ) end end |