Class: Karafka::TimeTrackers::Pause
- Defined in:
 - lib/karafka/time_trackers/pause.rb
 
Overview
We do not have to worry about performance implications of a mutex wrapping most of the code here, as this is not a frequently used tracker. It is active only once per batch in case of long-running-jobs and upon errors.
Handles Kafka topic partition pausing and resuming with exponential back-offs. Since expiring and pausing can happen from both consumer and listener, this needs to be thread-safe.
Instance Attribute Summary collapse
- 
  
    
      #attempt  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    
Returns the value of attribute attempt.
 - 
  
    
      #current_timeout  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    
Returns the value of attribute current_timeout.
 
Instance Method Summary collapse
- 
  
    
      #expire  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Expires the pause, so it can be considered expired.
 - 
  
    
      #expired?  ⇒ Boolean 
    
    
  
  
  
  
  
  
  
  
  
    
Did the pause expire.
 - 
  
    
      #increment  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Increments the number of attempt by 1.
 - 
  
    
      #initialize(timeout:, max_timeout:, exponential_backoff:)  ⇒ Karafka::TimeTrackers::Pause 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    
 - 
  
    
      #pause(timeout = backoff_interval)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Pauses the processing from now till the end of the interval (backoff or non-backoff) and records the attempt.
 - 
  
    
      #paused?  ⇒ Boolean 
    
    
  
  
  
  
  
  
  
  
  
    
Are we paused from processing.
 - 
  
    
      #reset  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Resets the pause attempt count.
 - 
  
    
      #resume  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Marks the pause as resumed.
 
Constructor Details
#initialize(timeout:, max_timeout:, exponential_backoff:) ⇒ Karafka::TimeTrackers::Pause
      42 43 44 45 46 47 48 49 50 51  | 
    
      # File 'lib/karafka/time_trackers/pause.rb', line 42 def initialize(timeout:, max_timeout:, exponential_backoff:) @started_at = nil @attempt = 0 @timeout = timeout @current_timeout = timeout @max_timeout = max_timeout @exponential_backoff = exponential_backoff @mutex = Mutex.new super() end  | 
  
Instance Attribute Details
#attempt ⇒ Object (readonly)
Returns the value of attribute attempt.
      13 14 15  | 
    
      # File 'lib/karafka/time_trackers/pause.rb', line 13 def attempt @attempt end  | 
  
#current_timeout ⇒ Object (readonly)
Returns the value of attribute current_timeout.
      13 14 15  | 
    
      # File 'lib/karafka/time_trackers/pause.rb', line 13 def current_timeout @current_timeout end  | 
  
Instance Method Details
#expire ⇒ Object
Expires the pause, so it can be considered expired
      82 83 84 85 86  | 
    
      # File 'lib/karafka/time_trackers/pause.rb', line 82 def expire @mutex.synchronize do @ends_at = nil end end  | 
  
#expired? ⇒ Boolean
Returns did the pause expire.
      96 97 98 99 100  | 
    
      # File 'lib/karafka/time_trackers/pause.rb', line 96 def expired? @mutex.synchronize do @ends_at ? monotonic_now >= @ends_at : true end end  | 
  
#increment ⇒ Object
Increments the number of attempt by 1
      67 68 69 70 71  | 
    
      # File 'lib/karafka/time_trackers/pause.rb', line 67 def increment @mutex.synchronize do @attempt += 1 end end  | 
  
#pause(timeout = backoff_interval) ⇒ Object
Providing this value can be useful when we explicitly want to pause for a certain period of time, outside of any regular pausing logic
Pauses the processing from now till the end of the interval (backoff or non-backoff) and records the attempt.
      58 59 60 61 62 63 64  | 
    
      # File 'lib/karafka/time_trackers/pause.rb', line 58 def pause(timeout = backoff_interval) @mutex.synchronize do @current_timeout = timeout @started_at = monotonic_now @ends_at = @started_at + timeout end end  | 
  
#paused? ⇒ Boolean
Returns are we paused from processing.
      89 90 91 92 93  | 
    
      # File 'lib/karafka/time_trackers/pause.rb', line 89 def paused? @mutex.synchronize do !@started_at.nil? end end  | 
  
#reset ⇒ Object
Resets the pause attempt count.
      103 104 105 106 107  | 
    
      # File 'lib/karafka/time_trackers/pause.rb', line 103 def reset @mutex.synchronize do @attempt = 0 end end  | 
  
#resume ⇒ Object
Marks the pause as resumed.
      74 75 76 77 78 79  | 
    
      # File 'lib/karafka/time_trackers/pause.rb', line 74 def resume @mutex.synchronize do @started_at = nil @ends_at = nil end end  |