Module: Karafka::Processing::InlineInsights::Consumer
- Defined in:
- lib/karafka/processing/inline_insights/consumer.rb
Overview
Module that adds extra methods to the consumer that allow us to fetch the insights
Instance Method Summary collapse
-
#insights ⇒ Hash
(also: #statistics, #inline_insights)
Empty hash or hash with given partition insights if already present.
-
#insights? ⇒ Boolean
(also: #statistics?, #inline_insights?)
True if there are insights to work with, otherwise false.
Instance Method Details
#insights ⇒ Hash Also known as: statistics, inline_insights
Note:
We cache insights on the consumer, as in some scenarios we may no longer have them inside the Tracker, for example under involuntary revocation, incoming statistics may no longer have lost partition insights. Since we want to be consistent during single batch operations, we want to ensure, that if we have insights they are available throughout the whole processing.
Returns empty hash or hash with given partition insights if already present.
19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/karafka/processing/inline_insights/consumer.rb', line 19 def insights insights = Tracker.find(topic, partition) # If we no longer have new insights but we still have them locally, we can use them return @insights if @insights && insights.empty? # If insights are still the same, we can use them return @insights if @insights.equal?(insights) # If we've received new insights that are not empty, we can cache them @insights = insights end |
#insights? ⇒ Boolean Also known as: statistics?, inline_insights?
Returns true if there are insights to work with, otherwise false.
32 33 34 |
# File 'lib/karafka/processing/inline_insights/consumer.rb', line 32 def insights? !insights.empty? end |