Class: Karafka::Web::Processing::Consumers::ReportsMigrator

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/web/processing/consumers/reports_migrator.rb

Overview

Migrator for consumer reports that applies per-message transformations

Unlike the Management::Migrator which operates on aggregated states, this migrator runs on each individual consumer report as it’s processed.

This is necessary because: - Reports are continuously published during upgrades - Reports are short-lived (TTL-based) and don’t need persistent migrations - Old schema reports may remain in Kafka topics for extended periods

Migrations are lightweight transformations that normalize old report formats to work with current processing code. Migrations are stored in lib/karafka/web/management/migrations/consumers_reports/ alongside other migrations.

Instance Method Summary collapse

Instance Method Details

#call(report) ⇒ Hash

Applies all applicable migrations to a consumer report

Parameters:

  • report (Hash)

    deserialized consumer report

Returns:

  • (Hash)

    the same report object, potentially modified in-place



25
26
27
28
29
30
31
32
33
34
# File 'lib/karafka/web/processing/consumers/reports_migrator.rb', line 25

def call(report)
  # Apply each applicable migration in order
  migrations.each do |migration_class|
    next unless migration_class.applicable?(report[:schema_version])

    migration_class.new.migrate(report)
  end

  report
end