Active Job is a standard interface for interacting with job runners in Ruby on Rails. Active Job can be configured to work with Karafka. ## Active Job Setup The Active Job adapter must be set to `:karafka` or it will use the default value provided by Rails, which is `:async`. This can be done in the `config/application.rb`: ```ruby class Application < Rails::Application # ... config.active_job.queue_adapter = :karafka end ``` We can use the generator to create a new job: ``` rails generate job Example ``` The above command will create `app/jobs/example_job.rb`: ```ruby class ExampleJob < ActiveJob::Base # Set the topic name to default queue_as :default def perform(*args) # Perform Job end end ``` For Karafka server to understand which of the topics contain Active Job data, you need to indicate this in your `karafka.rb` routing section using the `#active_job_topic`: ```ruby class KarafkaApp < Karafka::App setup do |config| # ... end routes.draw do active_job_topic :default end end ``` `#active_job_topic` similar to `#topic` accepts block for additional configuration: ```ruby class KarafkaApp < Karafka::App setup do |config| # ... end routes.draw do active_job_topic :default do # Available only in Pro long_running_job true end end end ``` !!! note "" [Pro Enhanced ActiveJob](https://karafka.io/docs/Pro-Enhanced-Active-Job.md) adapter supports `Long-Running Jobs`, `Virtual Partitions`, `Ordered Jobs`, `Scheduled Jobs`, and other Pro features. ## Usage Jobs can be added to the job queue from anywhere. You can add a job to the queue by: ```ruby ExampleJob.perform_later args ``` At this point, Karafka will run the job for us. If the job fails, Karafka will retry the job as normal. ### Enqueuing Modes #### `#perform_later` When you enqueue a job using `#perform_later`, Karafka, by default, will produce a message to Kafka in an async fashion. A job will be added to a background process queue and dispatched without blocking the processing flow. You may want to alter this behavior for critical jobs and use synchronous enqueuing. To use it, just call the `karafka_options` method within your job class definition and set the `dispatch_method` to `:produce_sync` as followed: ```ruby class Job < ActiveJob::Base queue_as :my_kafka_jobs karafka_options( dispatch_method: :produce_sync ) def perform(value1, value2) puts "value1: #{value1}" puts "value2: #{value2}" end end Job.perform_later(1, 2) ``` #### `#perform_all_later` When you enqueue a jobs using `#perform_all_later`, Karafka, by default, will produce messages to Kafka in an async fashion. Jobs will be added to a background process queue and dispatched without blocking the processing flow. You may want to alter this behavior for critical jobs and use synchronous enqueuing. To use it, just call the `karafka_options` method within your job class definition and set the `dispatch_many_method` to `:produce_many_sync` as followed: ```ruby class Job < ActiveJob::Base queue_as :my_kafka_jobs karafka_options( dispatch_many_method: :produce_many_sync ) def perform(value1, value2) puts "value1: #{value1}" puts "value2: #{value2}" end end jobs = 2.times.map { |i| Job.new(i, i + 1) } Job.perform_all_later(jobs) ``` ## `karafka_options` Partial Inheritance When an ActiveJob class defines `karafka_options`, these options are designed to be inherited by any subclass of the job. This inheritance mechanism ensures that all the settings are consistently applied across different jobs, simplifying configuration management and promoting reusability. By default, when a subclass inherits from a parent job class with predefined `karafka_options`, the subclass automatically inherits all of these options. If no explicit `karafka_options` are defined in the subclass, it will use the options set in its parent class. However, when `karafka_options` are set in a subclass, it does not necessarily have to redefine all the options specified in the parent class. Instead, it can choose to overwrite only specific options. Karafka will merge the options defined in the subclass with those of the parent class. This merging process ensures that any option not explicitly overridden in the subclass retains its value from the parent class. For example, consider a parent job class configured with multiple karafka_options: ```ruby class ParentJob < ApplicationJob karafka_options( dispatch_method: :produce_sync, dispatch_many_method: :produce_many_async ) end ``` If a subclass intends to modify only the `dispatch_method` option, it can do so without having to redefine all other options: ```ruby class ChildJob < ParentJob karafka_options dispatch_method: :produce_async end ``` In this case, `ChildJob` will have `dispatch_many_method` taken from the `ParentJob`. This feature of partial options overriding allows for flexible configuration adjustments in subclassed jobs, making it easier to manage variations in job behavior without duplicating the entire set of options across multiple classes. ## Execution Warranties Karafka marks each job as consumed using `#mark_as_consumed` after successfully processing it. This means that the same job should not be processed twice unless the process is killed before the async marking in Kafka happens. ## Behaviour on Errors Active Job Karafka adapter will follow the Karafka general [runtime errors handling](https://karafka.io/docs/Error-handling-and-back-off-policy.md#runtime) strategy. Upon error, the partition will be paused, a backoff will happen, and Karafka will attempt to retry the job after a specific time. Please keep in mind that **as long as** the error persists, **no** other jobs from a given partition will be processed.