Module: Karafka::Pro::BaseConsumer
- Defined in:
- lib/karafka/pro/base_consumer.rb
Overview
Extra methods always used in the base consumer in the pro mode
We do not define those methods as part of the strategies flows, because they are injected (strategies) on singletons and often used only in one of the strategy variants
Methods here are suppose to be always available or are expected to be redefined
Instance Method Summary collapse
-
#errors_tracker ⇒ Karafka::Pro::Processing::Coordinators::ErrorsTracker
Tracker for errors that occurred during processing until another successful processing.
-
#subscription_groups_coordinator ⇒ Karafka::Pro::Processing::SubscriptionGroupsCoordinator
Coordinator allowing to pause and resume polling of the given subscription group jobs queue for postponing further work.
Instance Method Details
#errors_tracker ⇒ Karafka::Pro::Processing::Coordinators::ErrorsTracker
This will always contain only details of errors that occurred during #consume
because only those are retryable.
This may contain more than one error because: - this can collect various errors that might have happened during virtual partitions execution - errors can pile up during retries and until a clean run, they will be collected with a limit of last 100. We do not store more because a consumer with an endless error loop would cause memory leaks without such a limit.
Returns tracker for errors that occurred during processing until another successful processing.
35 36 37 |
# File 'lib/karafka/pro/base_consumer.rb', line 35 def errors_tracker coordinator.errors_tracker end |
#subscription_groups_coordinator ⇒ Karafka::Pro::Processing::SubscriptionGroupsCoordinator
Since this stops polling, it can cause reaching max.poll.interval.ms
limitations.
This is a low-level API used for cross-topic coordination and some advanced features. Use it at own risk.
Returns Coordinator allowing to pause and resume polling of the given subscription group jobs queue for postponing further work.
47 48 49 |
# File 'lib/karafka/pro/base_consumer.rb', line 47 def subscription_groups_coordinator Processing::SubscriptionGroupsCoordinator.instance end |