Class: Karafka::ActiveJob::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/active_job/dispatcher.rb

Overview

Dispatcher that sends the ActiveJob job to a proper topic based on the queue name

Direct Known Subclasses

Pro::ActiveJob::Dispatcher

Instance Method Summary collapse

Instance Method Details

#dispatch(job) ⇒ Object

Parameters:

  • job (ActiveJob::Base)

    job



21
22
23
24
25
26
27
# File 'lib/karafka/active_job/dispatcher.rb', line 21

def dispatch(job)
  Karafka.producer.public_send(
    fetch_option(job, :dispatch_method, DEFAULTS),
    topic: job.queue_name,
    payload: serialize_job(job)
  )
end

#dispatch_at(job, timestamp) ⇒ Object

Note:

Karafka Pro supports future jobs via the Scheduled Messages feature

Note:

For ActiveJob Continuation to work without Pro, configure your continuable jobs: self.resume_options = { wait: 0 }

Note:

For #retry_on to work without Pro, configure with: retry_on SomeError, wait: 0, jitter: 0

Raises info, that Karafka backend does not support scheduling jobs if someone wants to schedule jobs in the future. It works for past and present because we want to support things like continuation and #retry_on API with no wait and no jitter.

Parameters:

  • job (Object)

    job we cannot enqueue

  • timestamp (Time)

    time when job should run



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/karafka/active_job/dispatcher.rb', line 67

def dispatch_at(job, timestamp)
  # Dispatch at is used by some of the ActiveJob features that actually do not back-off
  # but things go via this API nonetheless.
  if timestamp.to_f <= Time.now.to_f
    dispatch(job)
  else
    raise NotImplementedError, <<~ERROR_MESSAGE
      This queueing backend does not support scheduling future jobs.

      If you're using ActiveJob Continuation, configure your jobs with:
        self.resume_options = { wait: 0 }

      If you're using retry_on, configure with:
        retry_on SomeError, wait: 0, jitter: 0

      For full support of delayed job execution, consider using Karafka Pro with Scheduled Messages.
    ERROR_MESSAGE
  end
end

#dispatch_many(jobs) ⇒ Object

Bulk dispatches multiple jobs using the Rails 7.1+ API

Parameters:

  • jobs (Array<ActiveJob::Base>)

    jobs we want to dispatch



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/karafka/active_job/dispatcher.rb', line 31

def dispatch_many(jobs)
  # Group jobs by their desired dispatch method
  # It can be configured per job class, so we need to make sure we divide them
  dispatches = Hash.new { |hash, key| hash[key] = [] }

  jobs.each do |job|
    d_method = fetch_option(job, :dispatch_many_method, DEFAULTS)

    dispatches[d_method] << {
      topic: job.queue_name,
      payload: serialize_job(job)
    }
  end

  dispatches.each do |type, messages|
    Karafka.producer.public_send(
      type,
      messages
    )
  end
end