Class: Karafka::Web::Processing::Consumer
- Inherits:
-
BaseConsumer
- Object
- BaseConsumer
- Karafka::Web::Processing::Consumer
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/web/processing/consumer.rb
Overview
Consumer used to squash and process statistics coming from particular processes, so this data can be read and used. We consume this info overwriting the data we previously had (if any)
Instance Method Summary collapse
-
#consume ⇒ Object
Aggregates consumers state into a single current state representation.
-
#shutdown ⇒ Object
Flush final state on shutdown.
Instance Method Details
#consume ⇒ Object
Aggregates consumers state into a single current state representation
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/karafka/web/processing/consumer.rb', line 15 def consume bootstrap! = .select { || .payload[:type] == 'consumer' } # If there is even one incompatible message, we need to stop .each do || case @reports_schema_manager.call() when :current true when :newer @reports_schema_manager.invalidate! dispatch raise ::Karafka::Web::Errors::Processing::IncompatibleSchemaError # Older reports mean someone is in the middle of upgrade. Schema change related # upgrades always should happen without a rolling-upgrade. For such, since we cannot # reason about their statistics structure, we only track state, so we can provide # basic upgrade reporting details in the status page. All other data is rejected and # since in most cases this is intermediate due to rolling upgrades, this should not # significantly impact the state tracking and processing. when :older @state_aggregator.add_state(.payload, .offset) next else raise ::Karafka::Errors::UnsupportedCaseError end # We need to run the aggregations on each message in order to compensate for # potential lags. @state_aggregator.add(.payload, .offset) @metrics_aggregator.add_report(.payload) @metrics_aggregator.add_stats(@state_aggregator.stats) # Indicates that we had at least one report we used to enrich data # If there were no state changes, there is no reason to flush data. This can occur # when we had some messages but we skipped them for any reason on a first run @established = true # Optimize memory usage in pro .clean! if Karafka.pro? end return unless periodic_flush? dispatch mark_as_consumed(.last) end |
#shutdown ⇒ Object
Flush final state on shutdown
67 68 69 |
# File 'lib/karafka/web/processing/consumer.rb', line 67 def shutdown dispatch end |