Class: Karafka::Web::Processing::Consumers::State
- Inherits:
-
Object
- Object
- Karafka::Web::Processing::Consumers::State
- Defined in:
- lib/karafka/web/processing/consumers/state.rb
Overview
Fetches the current consumer processes aggregated state
Class Method Summary collapse
-
.current! ⇒ Hash
Fetch the current consumers state that is expected to exist.
Class Method Details
.current! ⇒ Hash
Fetch the current consumers state that is expected to exist
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/karafka/web/processing/consumers/state.rb', line 13 def current! = ::Karafka::Admin.read_topic( Karafka::Web.config.topics.consumers.states, 0, # We need to take more in case there would be transactions running. # In theory we could take two but this compensates for any involuntary # revocations and cases where two producers would write to the same state 5 ).last return .payload if raise(::Karafka::Web::Errors::Processing::MissingConsumersStateError) rescue Rdkafka::RdkafkaError => e raise(e) unless e.code == :unknown_partition raise(::Karafka::Web::Errors::Processing::MissingConsumersStatesTopicError) end |