Class: Karafka::Processing::Jobs::Base

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/karafka/processing/jobs/base.rb

Overview

Base class for all the jobs types that are suppose to run in workers threads. Each job can have 3 main entry-points: #before_call, #call and #after_call Only #call is required.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBase

Creates a new job instance



19
20
21
22
23
24
# File 'lib/karafka/processing/jobs/base.rb', line 19

def initialize
  # All jobs are blocking by default and they can release the lock when blocking operations
  # are done (if needed)
  @non_blocking = false
  @status = :pending
end

Instance Attribute Details

#executorObject (readonly)

Returns the value of attribute executor.



16
17
18
# File 'lib/karafka/processing/jobs/base.rb', line 16

def executor
  @executor
end

Instance Method Details

#after_callObject

When redefined can run any code that should run after executing the proper code



41
# File 'lib/karafka/processing/jobs/base.rb', line 41

def after_call; end

#before_callObject

When redefined can run any code that should run before executing the proper code



33
# File 'lib/karafka/processing/jobs/base.rb', line 33

def before_call; end

#before_scheduleObject

Note:

This will run in the listener thread and not in the worker

When redefined can run any code prior to the job being scheduled

Raises:

  • (NotImplementedError)


28
29
30
# File 'lib/karafka/processing/jobs/base.rb', line 28

def before_schedule
  raise NotImplementedError, 'Please implement in a subclass'
end

#callObject

The main entry-point of a job

Raises:

  • (NotImplementedError)


36
37
38
# File 'lib/karafka/processing/jobs/base.rb', line 36

def call
  raise NotImplementedError, 'Please implement in a subclass'
end

#finish!Object

Note:

Since the scheduler knows exactly when it schedules jobs and when it keeps them pending, we do not need advanced state tracking and the only information from the “outside” is whether it was finished or not after it was scheduled for execution.

Marks the job as finished. Used by the worker to indicate, that this job is done.



66
67
68
# File 'lib/karafka/processing/jobs/base.rb', line 66

def finish!
  @status = :finished
end

#finished?Boolean

Returns was this job finished.

Returns:

  • (Boolean)

    was this job finished.



57
58
59
# File 'lib/karafka/processing/jobs/base.rb', line 57

def finished?
  @status == :finished
end

#non_blocking?Boolean

Note:

Blocking job is a job, that will cause the job queue to wait until it is finished before removing the lock on new jobs being added

Note:

All the jobs are blocking by default

Note:

Job needs to mark itself as non-blocking only after it is done with all the blocking things (pausing partition, etc).

Returns is this a non-blocking job.

Returns:

  • (Boolean)

    is this a non-blocking job



52
53
54
# File 'lib/karafka/processing/jobs/base.rb', line 52

def non_blocking?
  @non_blocking
end