Class: Karafka::Web::Processing::Consumers::SchemaManager
- Inherits:
-
Object
- Object
- Karafka::Web::Processing::Consumers::SchemaManager
- Defined in:
- lib/karafka/web/processing/consumers/schema_manager.rb
Overview
Schema manager is responsible for making sure, that the consumers reports messages that we consume have a compatible schema with the current process that is suppose to materialize them.
In general we always support at least one major version back and we recommend upgrades to previous versions (0.5 => 0.6 => 0.7)
This is needed in scenarios where a rolling deploy would get new karafka processes reporting data but consumption would still run in older.
Instance Method Summary collapse
-
#call(message) ⇒ Symbol
Is the given message using older, newer or current schema.
-
#initialize ⇒ SchemaManager
constructor
A new instance of SchemaManager.
-
#invalidate! ⇒ Object
Moves the schema manager state to incompatible to indicate in the Web-UI that we cannot move forward because schema is incompatible.
-
#to_s ⇒ String
State that we can use in the materialized state for the UI reporting.
Constructor Details
#initialize ⇒ SchemaManager
Returns a new instance of SchemaManager.
24 25 26 27 |
# File 'lib/karafka/web/processing/consumers/schema_manager.rb', line 24 def initialize @cache = {} @valid = true end |
Instance Method Details
#call(message) ⇒ Symbol
Returns is the given message using older, newer or current schema.
31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/karafka/web/processing/consumers/schema_manager.rb', line 31 def call() schema_version = .payload[:schema_version] # Save on memory allocation by reusing # Most of the time we will deal with compatible schemas, so it is not worth creating # an object with each message = @cache[schema_version] ||= ::Gem::Version.new(schema_version) return :older if < CURRENT_VERSION return :newer if > CURRENT_VERSION :current end |
#invalidate! ⇒ Object
The state switch is one-direction only. If we encounter an incompatible message we need to stop processing so further checks even with valid should not switch it back to valid
Moves the schema manager state to incompatible to indicate in the Web-UI that we cannot move forward because schema is incompatible.
51 52 53 |
# File 'lib/karafka/web/processing/consumers/schema_manager.rb', line 51 def invalidate! @valid = false end |
#to_s ⇒ String
Returns state that we can use in the materialized state for the UI reporting.
56 57 58 |
# File 'lib/karafka/web/processing/consumers/schema_manager.rb', line 56 def to_s @valid ? 'compatible' : 'incompatible' end |