Class: Karafka::Pro::ActiveJob::Dispatcher
- Inherits:
-
ActiveJob::Dispatcher
- Object
- ActiveJob::Dispatcher
- Karafka::Pro::ActiveJob::Dispatcher
- Defined in:
- lib/karafka/pro/active_job/dispatcher.rb
Overview
Pro dispatcher that sends the ActiveJob job to a proper topic based on the queue name and that allows to inject additional options into the producer, effectively allowing for a much better and more granular control over the dispatch and consumption process.
Instance Method Summary collapse
- #dispatch(job) ⇒ Object
-
#dispatch_at(job, timestamp) ⇒ Object
Will enqueue a job to run in the future.
-
#dispatch_many(jobs) ⇒ Object
Bulk dispatches multiple jobs using the Rails 7.1+ API.
Instance Method Details
#dispatch(job) ⇒ Object
41 42 43 44 45 46 47 48 49 |
# File 'lib/karafka/pro/active_job/dispatcher.rb', line 41 def dispatch(job) producer(job).public_send( fetch_option(job, :dispatch_method, DEFAULTS), dispatch_details(job).merge!( topic: job.queue_name, payload: ::ActiveSupport::JSON.encode(serialize_job(job)) ) ) end |
#dispatch_at(job, timestamp) ⇒ Object
Will enqueue a job to run in the future
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/karafka/pro/active_job/dispatcher.rb', line 85 def dispatch_at(job, ) = dispatch_details(job).merge!( topic: job.queue_name, payload: ::ActiveSupport::JSON.encode(serialize_job(job)) ) = Pro::ScheduledMessages.schedule( message: , epoch: .to_i, envelope: { # Select the scheduled messages proxy topic topic: fetch_option(job, :scheduled_messages_topic, DEFAULTS) } ) producer(job).public_send( fetch_option(job, :dispatch_method, DEFAULTS), ) end |
#dispatch_many(jobs) ⇒ Object
Bulk dispatches multiple jobs using the Rails 7.1+ API
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/karafka/pro/active_job/dispatcher.rb', line 53 def dispatch_many(jobs) # First level is type of dispatch and second is the producer we want to use to dispatch dispatches = Hash.new do |hash, key| hash[key] = Hash.new do |hash2, key2| hash2[key2] = [] end end jobs.each do |job| d_method = fetch_option(job, :dispatch_many_method, DEFAULTS) producer = producer(job) dispatches[d_method][producer] << dispatch_details(job).merge!( topic: job.queue_name, payload: ::ActiveSupport::JSON.encode(serialize_job(job)) ) end dispatches.each do |d_method, producers| producers.each do |producer, | producer.public_send( d_method, ) end end end |