Class: Karafka::Processing::Jobs::Consume
- Defined in:
- lib/karafka/processing/jobs/consume.rb
Overview
The main job type. It runs the executor that triggers given topic partition messages processing in an underlying consumer instance.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#messages ⇒ Array<Rdkafka::Consumer::Message>
readonly
Array with messages.
Attributes inherited from Base
Instance Method Summary collapse
-
#after_call ⇒ Object
Runs any error handling and other post-consumption stuff on the executor.
-
#before_call ⇒ Object
Runs the before consumption preparations on the executor.
-
#before_schedule ⇒ Object
Runs all the preparation code on the executor that needs to happen before the job is scheduled.
-
#call ⇒ Object
Runs the given executor.
- #initialize(executor, messages) ⇒ Consume constructor
Methods inherited from Base
#finish!, #finished?, #non_blocking?
Constructor Details
#initialize(executor, messages) ⇒ Consume
16 17 18 19 20 |
# File 'lib/karafka/processing/jobs/consume.rb', line 16 def initialize(executor, ) @executor = executor @messages = super() end |
Instance Attribute Details
#messages ⇒ Array<Rdkafka::Consumer::Message> (readonly)
Returns array with messages.
10 11 12 |
# File 'lib/karafka/processing/jobs/consume.rb', line 10 def @messages end |
Instance Method Details
#after_call ⇒ Object
Runs any error handling and other post-consumption stuff on the executor
39 40 41 |
# File 'lib/karafka/processing/jobs/consume.rb', line 39 def after_call executor.after_consume end |
#before_call ⇒ Object
Runs the before consumption preparations on the executor
29 30 31 |
# File 'lib/karafka/processing/jobs/consume.rb', line 29 def before_call executor.before_consume end |
#before_schedule ⇒ Object
Runs all the preparation code on the executor that needs to happen before the job is scheduled.
24 25 26 |
# File 'lib/karafka/processing/jobs/consume.rb', line 24 def before_schedule executor.before_schedule_consume(@messages) end |
#call ⇒ Object
Runs the given executor
34 35 36 |
# File 'lib/karafka/processing/jobs/consume.rb', line 34 def call executor.consume end |