Class: Karafka::Pro::Processing::Schedulers::Default

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/pro/processing/schedulers/default.rb

Overview

Note:

This is a stateless scheduler, thus we can override the #on_ API.

Optimizes scheduler that takes into consideration of execution time needed to process messages from given topics partitions. It uses the non-preemptive LJF algorithm

This scheduler is designed to optimize execution times on jobs that perform IO operations as when taking IO into consideration, the can achieve optimized parallel processing.

This scheduler can also work with virtual partitions.

Aside from consumption jobs, other jobs do not run often, thus we can leave them with default FIFO scheduler from the default Karafka scheduler

Instance Method Summary collapse

Methods inherited from Base

#clear, #initialize, #manage, #schedule_consumption

Constructor Details

This class inherits a constructor from Karafka::Pro::Processing::Schedulers::Base

Instance Method Details

#on_clear(_group_id) ⇒ Object

This scheduler does not need to be cleared because it is stateless

Parameters:

  • _group_id (String)

    Subscription group id



74
75
76
# File 'lib/karafka/pro/processing/schedulers/default.rb', line 74

def on_clear(_group_id)
  nil
end

#on_manageObject

This scheduler does not have anything to manage as it is a pass through and has no state



67
68
69
# File 'lib/karafka/pro/processing/schedulers/default.rb', line 67

def on_manage
  nil
end

#on_schedule_consumption(jobs_array) ⇒ Object

Schedules jobs in the LJF order for consumption

Parameters:

  • jobs_array

    [Array<Karafka::Processing::Jobs::Consume, Processing::Jobs::ConsumeNonBlocking>] jobs for scheduling



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/karafka/pro/processing/schedulers/default.rb', line 29

def on_schedule_consumption(jobs_array)
  perf_tracker = Instrumentation::PerformanceTracker.instance

  ordered = []

  jobs_array.each do |job|
    ordered << [
      job,
      processing_cost(perf_tracker, job)
    ]
  end

  ordered.sort_by!(&:last)
  ordered.reverse!
  ordered.map!(&:first)

  ordered.each do |job|
    @queue << job
  end
end

#schedule_fifo(jobs_array) ⇒ Object Also known as: on_schedule_revocation, on_schedule_shutdown, on_schedule_idle, on_schedule_periodic, on_schedule_eofed

Schedules any jobs provided in a fifo order

Parameters:



52
53
54
55
56
# File 'lib/karafka/pro/processing/schedulers/default.rb', line 52

def schedule_fifo(jobs_array)
  jobs_array.each do |job|
    @queue << job
  end
end