Class: ActiveJob::QueueAdapters::KarafkaAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/active_job/queue_adapters/karafka_adapter.rb

Overview

Karafka adapter for enqueuing jobs This is here for ease of integration with ActiveJob.

Instance Method Summary collapse

Instance Method Details

#enqueue(job) ⇒ Object

Enqueues the job using the configured dispatcher

Parameters:

  • job (Object)

    job that should be enqueued



41
42
43
# File 'lib/active_job/queue_adapters/karafka_adapter.rb', line 41

def enqueue(job)
  dispatcher.dispatch(job)
end

#enqueue_after_transaction_commit?true

Returns should we by default enqueue after the transaction and not during. Defaults to true to prevent weird issues during rollbacks, etc.

Returns:

  • (true)

    should we by default enqueue after the transaction and not during. Defaults to true to prevent weird issues during rollbacks, etc.



64
65
66
# File 'lib/active_job/queue_adapters/karafka_adapter.rb', line 64

def enqueue_after_transaction_commit?
  true
end

#enqueue_all(jobs) ⇒ Integer

Enqueues multiple jobs in one go

Parameters:

  • jobs (Array<Object>)

    jobs that we want to enqueue

Returns:

  • (Integer)

    number of jobs enqueued (required by Rails)



48
49
50
51
# File 'lib/active_job/queue_adapters/karafka_adapter.rb', line 48

def enqueue_all(jobs)
  dispatcher.dispatch_many(jobs)
  jobs.size
end

#enqueue_at(job, timestamp) ⇒ Object

Delegates time sensitive dispatch to the dispatcher. OSS will raise error, Pro will handle this as it supports scheduled messages.

Parameters:

  • job (Object)

    job we want to enqueue

  • timestamp (Time)

    time when job should run



58
59
60
# File 'lib/active_job/queue_adapters/karafka_adapter.rb', line 58

def enqueue_at(job, timestamp)
  dispatcher.dispatch_at(job, timestamp)
end

#stopping?Boolean

Returns should we stop the job. Used by the ActiveJob continuation feature.

Returns:

  • (Boolean)

    should we stop the job. Used by the ActiveJob continuation feature



69
70
71
# File 'lib/active_job/queue_adapters/karafka_adapter.rb', line 69

def stopping?
  ::Karafka::App.done?
end