Class: Karafka::Web::Pro::Ui::Controllers::Consumers::Partitions::OffsetsController

Inherits:
BaseController show all
Defined in:
lib/karafka/web/pro/ui/controllers/consumers/partitions/offsets_controller.rb

Overview

Partition offset management controller in the context of current consumer process assignments

Constant Summary

Constants inherited from Ui::Controllers::BaseController

Ui::Controllers::BaseController::Models

Instance Attribute Summary

Attributes inherited from Ui::Controllers::BaseController

#params, #session

Instance Method Summary collapse

Methods inherited from ConsumersController

#details, #index, #performance, #subscriptions

Methods inherited from Ui::Controllers::BaseController

#cache, #initialize

Methods included from Ui::Controllers::Requests::Hookable

included, #run_after_hooks, #run_before_hooks

Constructor Details

This class inherits a constructor from Karafka::Web::Ui::Controllers::BaseController

Instance Method Details

#edit(process_id, subscription_group_id, topic, partition_id) ⇒ Object

Displays the offset edit page with the edit form or a warning when not applicable

Parameters:

  • process_id (String)

    id of the process we’re interested in

  • subscription_group_id (String)
  • topic (String)
  • partition_id (Integer)


24
25
26
27
28
# File 'lib/karafka/web/pro/ui/controllers/consumers/partitions/offsets_controller.rb', line 24

def edit(process_id, subscription_group_id, topic, partition_id)
  bootstrap!(process_id, subscription_group_id, topic, partition_id)

  render
end

#update(process_id, subscription_group_id, topic, partition_id) ⇒ Object

Triggers the offset change in the running process via the commanding API

Parameters:

  • process_id (String)

    id of the process we’re interested in

  • subscription_group_id (String)
  • topic (String)
  • partition_id (Integer)


36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/karafka/web/pro/ui/controllers/consumers/partitions/offsets_controller.rb', line 36

def update(process_id, subscription_group_id, topic, partition_id)
  edit(process_id, subscription_group_id, topic, partition_id)

  offset = params.int(:offset)
  prevent_overtaking = params.bool(:prevent_overtaking)
  force_resume = params.bool(:force_resume)

  Commanding::Dispatcher.request(
    Commanding::Commands::Partitions::Seek.name,
    process_id,
    {
      consumer_group_id: @consumer_group.id,
      subscription_group_id: @subscription_group.id,
      topic: topic,
      partition_id: partition_id,
      offset: offset,
      prevent_overtaking: prevent_overtaking,
      force_resume: force_resume
    }
  )

  redirect(
    :previous,
    success: format_flash(
      'Initiated offset adjustment to ? for ?#? (subscription group: ?)',
      offset,
      topic,
      partition_id,
      subscription_group_id
    )
  )
end