Class: Karafka::Web::Processing::Consumer
- Inherits:
-
BaseConsumer
- Object
- BaseConsumer
- Karafka::Web::Processing::Consumer
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/web/processing/consumer.rb
Overview
Consumer used to squash and process statistics coming from particular processes, so this data can be read and used. We consume this info overwriting the data we previously had (if any)
Instance Method Summary collapse
-
#consume ⇒ Object
Aggregates consumers state into a single current state representation.
-
#shutdown ⇒ Object
Flush final state on shutdown.
Instance Method Details
#consume ⇒ Object
Aggregates consumers state into a single current state representation
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/karafka/web/processing/consumer.rb', line 15 def consume bootstrap! = .select { || .payload[:type] == 'consumer' } # If there is even one incompatible message, we need to stop .each do || case @reports_schema_manager.call() when :current true when :newer @reports_schema_manager.invalidate! dispatch raise ::Karafka::Web::Errors::Processing::IncompatibleSchemaError # Older reports mean someone is in the middle of upgrade. Schema change related # upgrades always should happen without a rolling-upgrade, hence we can reject those # requests without significant or any impact on data quality but without having to # worry about backwards compatibility. Errors are tracked independently, so it should # not be a problem. # # In case user wants to do a rolling upgrade, the user docs state that this can happen # and it is something user should be aware when :older next else raise ::Karafka::Errors::UnsupportedCaseError end # We need to run the aggregations on each message in order to compensate for # potential lags. @state_aggregator.add(.payload, .offset) @metrics_aggregator.add_report(.payload) @metrics_aggregator.add_stats(@state_aggregator.stats) # Indicates that we had at least one report we used to enrich data # If there were no state changes, there is no reason to flush data. This can occur # when we had some messages but we skipped them for any reason on a first run @established = true # Optimize memory usage in pro .clean! if Karafka.pro? end return unless periodic_flush? dispatch mark_as_consumed(.last) end |
#shutdown ⇒ Object
Flush final state on shutdown
67 68 69 |
# File 'lib/karafka/web/processing/consumer.rb', line 67 def shutdown dispatch end |