Class: Karafka::Pro::Processing::Schedulers::Default
- Defined in:
- lib/karafka/pro/processing/schedulers/default.rb
Overview
This is a stateless scheduler, thus we can override the #on_
API.
Optimizes scheduler that takes into consideration of execution time needed to process messages from given topics partitions. It uses the non-preemptive LJF algorithm
This scheduler is designed to optimize execution times on jobs that perform IO operations as when taking IO into consideration, the can achieve optimized parallel processing.
This scheduler can also work with virtual partitions.
Aside from consumption jobs, other jobs do not run often, thus we can leave them with default FIFO scheduler from the default Karafka scheduler
Instance Method Summary collapse
-
#on_clear(_group_id) ⇒ Object
This scheduler does not need to be cleared because it is stateless.
-
#on_manage ⇒ Object
This scheduler does not have anything to manage as it is a pass through and has no state.
-
#on_schedule_consumption(jobs_array) ⇒ Object
Schedules jobs in the LJF order for consumption.
-
#schedule_fifo(jobs_array) ⇒ Object
(also: #on_schedule_revocation, #on_schedule_shutdown, #on_schedule_idle, #on_schedule_periodic, #on_schedule_eofed)
Schedules any jobs provided in a fifo order.
Methods inherited from Base
#clear, #initialize, #manage, #schedule_consumption
Constructor Details
This class inherits a constructor from Karafka::Pro::Processing::Schedulers::Base
Instance Method Details
#on_clear(_group_id) ⇒ Object
This scheduler does not need to be cleared because it is stateless
82 83 84 |
# File 'lib/karafka/pro/processing/schedulers/default.rb', line 82 def on_clear(_group_id) nil end |
#on_manage ⇒ Object
This scheduler does not have anything to manage as it is a pass through and has no state
75 76 77 |
# File 'lib/karafka/pro/processing/schedulers/default.rb', line 75 def on_manage nil end |
#on_schedule_consumption(jobs_array) ⇒ Object
Schedules jobs in the LJF order for consumption
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/karafka/pro/processing/schedulers/default.rb', line 37 def on_schedule_consumption(jobs_array) perf_tracker = Instrumentation::PerformanceTracker.instance ordered = [] jobs_array.each do |job| ordered << [ job, processing_cost(perf_tracker, job) ] end ordered.sort_by!(&:last) ordered.reverse! ordered.map!(&:first) ordered.each do |job| @queue << job end end |
#schedule_fifo(jobs_array) ⇒ Object Also known as: on_schedule_revocation, on_schedule_shutdown, on_schedule_idle, on_schedule_periodic, on_schedule_eofed
Schedules any jobs provided in a fifo order
60 61 62 63 64 |
# File 'lib/karafka/pro/processing/schedulers/default.rb', line 60 def schedule_fifo(jobs_array) jobs_array.each do |job| @queue << job end end |