Class: Karafka::TimeTrackers::Pause
- Defined in:
- lib/karafka/time_trackers/pause.rb
Overview
We do not have to worry about performance implications of a mutex wrapping most of the code here, as this is not a frequently used tracker. It is active only once per batch in case of long-running-jobs and upon errors.
Handles Kafka topic partition pausing and resuming with exponential back-offs. Since expiring and pausing can happen from both consumer and listener, this needs to be thread-safe.
Instance Attribute Summary collapse
-
#attempt ⇒ Object
readonly
Returns the value of attribute attempt.
-
#current_timeout ⇒ Object
readonly
Returns the value of attribute current_timeout.
Instance Method Summary collapse
-
#expire ⇒ Object
Expires the pause, so it can be considered expired.
-
#expired? ⇒ Boolean
Did the pause expire.
-
#increment ⇒ Object
Increments the number of attempt by 1.
- #initialize(timeout:, max_timeout:, exponential_backoff:) ⇒ Karafka::TimeTrackers::Pause constructor
-
#pause(timeout = backoff_interval) ⇒ Object
Pauses the processing from now till the end of the interval (backoff or non-backoff) and records the attempt.
-
#paused? ⇒ Boolean
Are we paused from processing.
-
#reset ⇒ Object
Resets the pause attempt count.
-
#resume ⇒ Object
Marks the pause as resumed.
Constructor Details
#initialize(timeout:, max_timeout:, exponential_backoff:) ⇒ Karafka::TimeTrackers::Pause
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/karafka/time_trackers/pause.rb', line 42 def initialize(timeout:, max_timeout:, exponential_backoff:) @started_at = nil @attempt = 0 @timeout = timeout @current_timeout = timeout @max_timeout = max_timeout @exponential_backoff = exponential_backoff @mutex = Mutex.new super() end |
Instance Attribute Details
#attempt ⇒ Object (readonly)
Returns the value of attribute attempt.
13 14 15 |
# File 'lib/karafka/time_trackers/pause.rb', line 13 def attempt @attempt end |
#current_timeout ⇒ Object (readonly)
Returns the value of attribute current_timeout.
13 14 15 |
# File 'lib/karafka/time_trackers/pause.rb', line 13 def current_timeout @current_timeout end |
Instance Method Details
#expire ⇒ Object
Expires the pause, so it can be considered expired
82 83 84 85 86 |
# File 'lib/karafka/time_trackers/pause.rb', line 82 def expire @mutex.synchronize do @ends_at = nil end end |
#expired? ⇒ Boolean
Returns did the pause expire.
96 97 98 99 100 |
# File 'lib/karafka/time_trackers/pause.rb', line 96 def expired? @mutex.synchronize do @ends_at ? monotonic_now >= @ends_at : true end end |
#increment ⇒ Object
Increments the number of attempt by 1
67 68 69 70 71 |
# File 'lib/karafka/time_trackers/pause.rb', line 67 def increment @mutex.synchronize do @attempt += 1 end end |
#pause(timeout = backoff_interval) ⇒ Object
Providing this value can be useful when we explicitly want to pause for a certain period of time, outside of any regular pausing logic
Pauses the processing from now till the end of the interval (backoff or non-backoff) and records the attempt.
58 59 60 61 62 63 64 |
# File 'lib/karafka/time_trackers/pause.rb', line 58 def pause(timeout = backoff_interval) @mutex.synchronize do @current_timeout = timeout @started_at = monotonic_now @ends_at = @started_at + timeout end end |
#paused? ⇒ Boolean
Returns are we paused from processing.
89 90 91 92 93 |
# File 'lib/karafka/time_trackers/pause.rb', line 89 def paused? @mutex.synchronize do !@started_at.nil? end end |
#reset ⇒ Object
Resets the pause attempt count.
103 104 105 106 107 |
# File 'lib/karafka/time_trackers/pause.rb', line 103 def reset @mutex.synchronize do @attempt = 0 end end |
#resume ⇒ Object
Marks the pause as resumed.
74 75 76 77 78 79 |
# File 'lib/karafka/time_trackers/pause.rb', line 74 def resume @mutex.synchronize do @started_at = nil @ends_at = nil end end |