Class: Karafka::Web::Pro::Commanding::Handlers::Partitions::Commands::Seek
- Inherits:
-
Base
- Object
- Base
- Karafka::Web::Pro::Commanding::Handlers::Partitions::Commands::Seek
- Defined in:
- lib/karafka/web/pro/commanding/handlers/partitions/commands/seek.rb
Overview
Moves the offset and optionally also resumes processing (if applicable) to where user wanted
Instance Method Summary collapse
-
#call ⇒ Object
Runs seeking with some extra options if applicable.
Methods inherited from Base
Constructor Details
This class inherits a constructor from Karafka::Web::Pro::Commanding::Handlers::Partitions::Commands::Base
Instance Method Details
#call ⇒ Object
Runs seeking with some extra options if applicable
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/karafka/web/pro/commanding/handlers/partitions/commands/seek.rb', line 17 def call # If user enabled overtaking prevention and we're already ahead of the requested # offset, we should ditch such a request if prevent_overtaking? && coordinator.seek_offset && coordinator.seek_offset >= desired_offset result('prevented') return end # Mark previous offset as consumed. We move the offset in case the first message # after seeking would be a poison pill. That way the offset position is # moved even if we get a rebalance later. assigned = client.mark_as_consumed!( (desired_offset - 1) ) # If we were not able to mark as consumed it means that the assignment was lost # We should signal this and stop unless assigned result('lost_partition') return end client.seek((desired_offset)) coordinator.seek_offset = desired_offset # Clear the attempts. Previous attempts should not count to a changed offset and # we should start with a clean slate. That's why we reset the tracker coordinator.pause_tracker.reset # If there was a pause and if user no longer wants to wait until it expires, we # can reset it so the work starts immediately. coordinator.pause_tracker.expire if force_resume? result('applied') end |