Module: Karafka::ActiveJob::CurrentAttributes::Persistence

Defined in:
lib/karafka/active_job/current_attributes/persistence.rb

Overview

Module adding the current attributes persistence into the ActiveJob jobs This module wraps the Dispatcher#serialize_job to inject current attributes

Instance Method Summary collapse

Instance Method Details

#serialize_job(job) ⇒ String

Note:

This method creates a JobWrapper internally and passes it to the parent’s serialize_job method. The wrapper is transparent to the deserializer.

Wraps the Dispatcher#serialize_job to inject current attributes before serialization This allows us to modify the job before it’s serialized without modifying ActiveJob::Base

Parameters:

  • job (ActiveJob::Base)

    the original job to serialize

Returns:

  • (String)

    serialized job payload with current attributes injected



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/karafka/active_job/current_attributes/persistence.rb', line 17

def serialize_job(job)
  # Get the job hash
  job_hash = job.serialize

  # Inject current attributes
  _cattr_klasses.each do |key, cattr_klass_str|
    next if job_hash.key?(key)

    attrs = cattr_klass_str.constantize.attributes

    job_hash[key] = attrs unless attrs.empty?
  end

  # Wrap the modified hash in a simple object that implements #serialize
  # This avoids modifying the original job instance
  wrapper = JobWrapper.new(job_hash)

  # Pass the wrapper to the deserializer
  super(wrapper)
end