Module: Karafka::Pro::Processing::Strategies::Lrj::FtrMom
- Includes:
- Ftr
- Included in:
- Aj::FtrLrjMom, Aj::FtrLrjMomVp, FtrMomVp
- Defined in:
- lib/karafka/pro/processing/strategies/lrj/ftr_mom.rb
Overview
Long-Running Job enabled Filtering enabled Manual offset management enabled
It is really similar to the Lrj::Ftr but we do not mark anything as consumed
Constant Summary collapse
- FEATURES =
          Features for this strategy 
- %i[ filtering long_running_job manual_offset_management ].freeze 
Constants included from Default
Instance Method Summary collapse
- 
  
    
      #handle_after_consume  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    LRJ standard flow after consumption with potential filtering on success. 
Methods included from Default
#handle_before_schedule_consume, #handle_revoked, #synchronize
Methods included from Default
#handle_before_consume, #handle_before_schedule_consume, #handle_before_schedule_tick, #handle_consume, #handle_revoked, #handle_tick, #mark_as_consumed, #mark_as_consumed!, #mark_in_memory, #mark_in_transaction, #mark_with_transaction, #store_offset_metadata, #transaction
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_eofed, #handle_idle, #handle_initialized, #handle_revoked, #handle_shutdown, #handle_wrap, #mark_as_consumed, #mark_as_consumed!
Methods included from Karafka::Processing::Strategies::Base
#handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Methods included from Ftr::Default
#handle_idle, #handle_post_filtering
Instance Method Details
#handle_after_consume ⇒ Object
LRJ standard flow after consumption with potential filtering on success
| 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | # File 'lib/karafka/pro/processing/strategies/lrj/ftr_mom.rb', line 27 def handle_after_consume coordinator.on_finished do || if coordinator.success? coordinator.pause_tracker.reset # Manual pausing has the highest priority return if coordinator.manual_pause? # If still not revoked and was throttled, we need to apply filtering logic if coordinator.filtered? && !revoked? handle_post_filtering elsif !revoked? && !coordinator.manual_seek? # If not revoked and not throttled, we move to where we were suppose to and # resume seek(.offset + 1, false, reset_offset: false) resume else resume end else # If processing failed, we need to pause # For long running job this will overwrite the default never-ending pause and # will cause the processing to keep going after the error backoff retry_after_pause end end end |