Class: Karafka::Web::Processing::Consumers::SchemaManager

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/web/processing/consumers/schema_manager.rb

Overview

Schema manager is responsible for making sure, that the consumers reports messages that we consume have a compatible schema with the current process that is suppose to materialize them.

In general we always support at least one major version back and we recommend upgrades to previous versions (0.5 => 0.6 => 0.7)

This is needed in scenarios where a rolling deploy would get new karafka processes reporting data but consumption would still run in older.

Instance Method Summary collapse

Constructor Details

#initializeSchemaManager

Returns a new instance of SchemaManager.



24
25
26
27
# File 'lib/karafka/web/processing/consumers/schema_manager.rb', line 24

def initialize
  @cache = {}
  @valid = true
end

Instance Method Details

#call(message) ⇒ Symbol

Returns is the given message using older, newer or current schema.

Parameters:

  • message (Karafka::Messages::Message)

    consumer report

Returns:

  • (Symbol)

    is the given message using older, newer or current schema



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/karafka/web/processing/consumers/schema_manager.rb', line 31

def call(message)
  schema_version = message.payload[:schema_version]

  # Save on memory allocation by reusing
  # Most of the time we will deal with compatible schemas, so it is not worth creating
  # an object with each message
  message_version = @cache[schema_version] ||= ::Gem::Version.new(schema_version)

  return :older if message_version < CURRENT_VERSION
  return :newer if message_version > CURRENT_VERSION

  :current
end

#invalidate!Object

Note:

The state switch is one-direction only. If we encounter an incompatible message we need to stop processing so further checks even with valid should not switch it back to valid

Moves the schema manager state to incompatible to indicate in the Web-UI that we cannot move forward because schema is incompatible.



51
52
53
# File 'lib/karafka/web/processing/consumers/schema_manager.rb', line 51

def invalidate!
  @valid = false
end

#to_sString

Returns state that we can use in the materialized state for the UI reporting.

Returns:

  • (String)

    state that we can use in the materialized state for the UI reporting



56
57
58
# File 'lib/karafka/web/processing/consumers/schema_manager.rb', line 56

def to_s
  @valid ? 'compatible' : 'incompatible'
end