Class: Karafka::Web::Ui::Models::Process

Inherits:
Lib::HashProxy show all
Defined in:
lib/karafka/web/ui/models/process.rb

Overview

Single consumer process representation

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Lib::HashProxy

#initialize, #method_missing, #respond_to_missing?, #to_h

Constructor Details

This class inherits a constructor from Karafka::Web::Ui::Lib::HashProxy

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class Karafka::Web::Ui::Lib::HashProxy

Class Method Details

.find(state, process_id) ⇒ Process

Looks for a given process based on its id

Parameters:

  • state (State)

    state of the system based on which we will do the lookup

  • process_id (String)

    id of the process we are looking for

Returns:

  • (Process)

    selected process or error raised

Raises:



15
16
17
18
# File 'lib/karafka/web/ui/models/process.rb', line 15

def find(state, process_id)
  found_process = Processes.active(state).find { |process| process.id == process_id }
  found_process || raise(::Karafka::Web::Errors::Ui::NotFoundError, process_id)
end

Instance Method Details

#consumer_groupsArray<ConsumerGroup>

Returns consumer groups to which this process is subscribed in an alphabetical order.

Returns:

  • (Array<ConsumerGroup>)

    consumer groups to which this process is subscribed in an alphabetical order



28
29
30
31
32
33
# File 'lib/karafka/web/ui/models/process.rb', line 28

def consumer_groups
  super
    .values
    .map { |cg_hash| ConsumerGroup.new(cg_hash) }
    .sort_by(&:id)
end

#idString

Returns process id without the name and ip.

Returns:

  • (String)

    process id without the name and ip



22
23
24
# File 'lib/karafka/web/ui/models/process.rb', line 22

def id
  @id ||= name.split(':').last
end

#jobsArray<Job>

Jobs sorted from longest running to youngest

Returns:

  • (Array<Job>)

    current jobs of this process



37
38
39
40
41
42
# File 'lib/karafka/web/ui/models/process.rb', line 37

def jobs
  super
    .map { |job| Job.new(job) }
    .sort_by(&:updated_at)
    .then { |jobs| Jobs.new(jobs) }
end

#lag_hybridInteger

Returns collective hybrid lag on this process.

Returns:

  • (Integer)

    collective hybrid lag on this process



45
46
47
48
49
50
51
52
53
# File 'lib/karafka/web/ui/models/process.rb', line 45

def lag_hybrid
  consumer_groups
    .flat_map(&:subscription_groups)
    .flat_map(&:topics)
    .flat_map(&:partitions)
    .map(&:lag_hybrid)
    .delete_if(&:negative?)
    .sum
end

#subscribed?Boolean

Returns true if there are any active subscriptions, otherwise false.

Returns:

  • (Boolean)

    true if there are any active subscriptions, otherwise false.



56
57
58
59
60
61
62
# File 'lib/karafka/web/ui/models/process.rb', line 56

def subscribed?
  return false if consumer_groups.empty?

  consumer_groups.any? do |cg|
    !cg.subscription_groups.empty?
  end
end

#subscribed_partitions_countInteger

Returns number of partitions to which we are currently subscribed.

Returns:

  • (Integer)

    number of partitions to which we are currently subscribed



65
66
67
68
69
70
71
# File 'lib/karafka/web/ui/models/process.rb', line 65

def subscribed_partitions_count
  consumer_groups
    .flat_map(&:subscription_groups)
    .flat_map(&:topics)
    .flat_map(&:partitions)
    .count
end