Class: Karafka::Processing::Jobs::Base
- Inherits:
-
Object
- Object
- Karafka::Processing::Jobs::Base
- Extended by:
- Forwardable
- Defined in:
- lib/karafka/processing/jobs/base.rb
Overview
Base class for all the jobs types that are suppose to run in workers threads. Each job can have 3 main entry-points: #before_call
, #call
and #after_call
Only #call
is required.
Direct Known Subclasses
Karafka::Pro::Processing::Jobs::Periodic, Consume, Eofed, Idle, Revoked, Shutdown
Class Attribute Summary collapse
-
.action ⇒ Symbol
Job matching appropriate action.
Instance Attribute Summary collapse
-
#executor ⇒ Object
readonly
Returns the value of attribute executor.
Instance Method Summary collapse
-
#after_call ⇒ Object
When redefined can run any code that should run after executing the proper code.
-
#before_call ⇒ Object
When redefined can run any code that should run before executing the proper code.
-
#before_schedule ⇒ Object
When redefined can run any code prior to the job being scheduled.
-
#call ⇒ Object
The main entry-point of a job.
-
#finish! ⇒ Object
Marks the job as finished.
-
#finished? ⇒ Boolean
Was this job finished.
-
#initialize ⇒ Base
constructor
Creates a new job instance.
-
#non_blocking? ⇒ Boolean
Is this a non-blocking job.
-
#wrap(&block) ⇒ Object
Runs the wrap/around job hook within which the rest of the flow happens.
Constructor Details
#initialize ⇒ Base
Creates a new job instance
24 25 26 27 28 29 |
# File 'lib/karafka/processing/jobs/base.rb', line 24 def initialize # All jobs are blocking by default and they can release the lock when blocking operations # are done (if needed) @non_blocking = false @status = :pending end |
Class Attribute Details
.action ⇒ Symbol
Returns Job matching appropriate action.
20 21 22 |
# File 'lib/karafka/processing/jobs/base.rb', line 20 def action @action end |
Instance Attribute Details
#executor ⇒ Object (readonly)
Returns the value of attribute executor.
16 17 18 |
# File 'lib/karafka/processing/jobs/base.rb', line 16 def executor @executor end |
Instance Method Details
#after_call ⇒ Object
When redefined can run any code that should run after executing the proper code
54 |
# File 'lib/karafka/processing/jobs/base.rb', line 54 def after_call; end |
#before_call ⇒ Object
When redefined can run any code that should run before executing the proper code
46 |
# File 'lib/karafka/processing/jobs/base.rb', line 46 def before_call; end |
#before_schedule ⇒ Object
This will run in the listener thread and not in the worker
When redefined can run any code prior to the job being scheduled
41 42 43 |
# File 'lib/karafka/processing/jobs/base.rb', line 41 def before_schedule raise NotImplementedError, 'Please implement in a subclass' end |
#call ⇒ Object
The main entry-point of a job
49 50 51 |
# File 'lib/karafka/processing/jobs/base.rb', line 49 def call raise NotImplementedError, 'Please implement in a subclass' end |
#finish! ⇒ Object
Since the scheduler knows exactly when it schedules jobs and when it keeps them pending, we do not need advanced state tracking and the only information from the “outside” is whether it was finished or not after it was scheduled for execution.
Marks the job as finished. Used by the worker to indicate, that this job is done.
79 80 81 |
# File 'lib/karafka/processing/jobs/base.rb', line 79 def finish! @status = :finished end |
#finished? ⇒ Boolean
Returns was this job finished.
70 71 72 |
# File 'lib/karafka/processing/jobs/base.rb', line 70 def finished? @status == :finished end |
#non_blocking? ⇒ Boolean
Blocking job is a job, that will cause the job queue to wait until it is finished before removing the lock on new jobs being added
All the jobs are blocking by default
Job needs to mark itself as non-blocking only after it is done with all the blocking things (pausing partition, etc).
Returns is this a non-blocking job.
65 66 67 |
# File 'lib/karafka/processing/jobs/base.rb', line 65 def non_blocking? @non_blocking end |
#wrap(&block) ⇒ Object
We inject the action name so user can decide whether to run custom logic on a given action or not.
Runs the wrap/around job hook within which the rest of the flow happens
35 36 37 |
# File 'lib/karafka/processing/jobs/base.rb', line 35 def wrap(&block) executor.wrap(self.class.action, &block) end |