Class: Karafka::Web::Pro::Commanding::Handlers::Partitions::Commands::Seek

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/web/pro/commanding/handlers/partitions/commands/seek.rb

Overview

Moves the offset and optionally also resumes processing (if applicable) to where user wanted

Instance Method Summary collapse

Methods inherited from Base

#initialize

Constructor Details

This class inherits a constructor from Karafka::Web::Pro::Commanding::Handlers::Partitions::Commands::Base

Instance Method Details

#callObject

Runs seeking with some extra options if applicable



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/karafka/web/pro/commanding/handlers/partitions/commands/seek.rb', line 17

def call
  # If user enabled overtaking prevention and we're already ahead of the requested
  # offset, we should ditch such a request
  if prevent_overtaking? &&
     coordinator.seek_offset &&
     coordinator.seek_offset >= desired_offset
    result('prevented')

    return
  end

  # Mark previous offset as consumed. We move the offset in case the first message
  # after seeking would be a poison pill. That way the offset position is
  # moved even if we get a rebalance later.
  assigned = client.mark_as_consumed!(
    seek_message(desired_offset - 1)
  )

  # If we were not able to mark as consumed it means that the assignment was lost
  # We should signal this and stop
  unless assigned
    result('lost_partition')

    return
  end

  client.seek(seek_message(desired_offset))

  coordinator.seek_offset = desired_offset
  # Clear the attempts. Previous attempts should not count to a changed offset and
  # we should start with a clean slate. That's why we reset the tracker
  coordinator.pause_tracker.reset
  # If there was a pause and if user no longer wants to wait until it expires, we
  # can reset it so the work starts immediately.
  coordinator.pause_tracker.expire if force_resume?

  result('applied')
end