Class: Karafka::Web::Ui::Models::Process

Inherits:
Lib::HashProxy show all
Defined in:
lib/karafka/web/ui/models/process.rb

Overview

Single consumer process representation

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Lib::HashProxy

#initialize, #method_missing, #respond_to_missing?, #to_h

Constructor Details

This class inherits a constructor from Karafka::Web::Ui::Lib::HashProxy

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class Karafka::Web::Ui::Lib::HashProxy

Class Method Details

.find(state, process_id) ⇒ Process

Looks for a given process based on its id

Parameters:

  • state (State)

    state of the system based on which we will do the lookup

  • process_id (String)

    id of the process we are looking for

Returns:

  • (Process)

    selected process or error raised

Raises:



15
16
17
18
# File 'lib/karafka/web/ui/models/process.rb', line 15

def find(state, process_id)
  found_process = Processes.active(state).find { |process| process.id == process_id }
  found_process || raise(::Karafka::Web::Errors::Ui::NotFoundError, process_id)
end

Instance Method Details

#consumer_groupsArray<ConsumerGroup>

Returns consumer groups to which this process is subscribed in an alphabetical order.

Returns:

  • (Array<ConsumerGroup>)

    consumer groups to which this process is subscribed in an alphabetical order



23
24
25
26
27
28
# File 'lib/karafka/web/ui/models/process.rb', line 23

def consumer_groups
  super
    .values
    .map { |cg_hash| ConsumerGroup.new(cg_hash) }
    .sort_by(&:id)
end

#jobsArray<Job>

Jobs sorted from longest running to youngest

Returns:

  • (Array<Job>)

    current jobs of this process



32
33
34
35
36
37
# File 'lib/karafka/web/ui/models/process.rb', line 32

def jobs
  super
    .map { |job| Job.new(job) }
    .sort_by(&:updated_at)
    .then { |jobs| Jobs.new(jobs) }
end

#lag_hybridInteger

Returns collective hybrid lag on this process.

Returns:

  • (Integer)

    collective hybrid lag on this process



50
51
52
53
54
55
56
57
58
# File 'lib/karafka/web/ui/models/process.rb', line 50

def lag_hybrid
  consumer_groups
    .flat_map(&:subscription_groups)
    .flat_map(&:topics)
    .flat_map(&:partitions)
    .map(&:lag_hybrid)
    .delete_if(&:negative?)
    .sum
end

#pending_jobs_countInteger

Returns number of pending jobs on a process.

Returns:

  • (Integer)

    number of pending jobs on a process



45
46
47
# File 'lib/karafka/web/ui/models/process.rb', line 45

def pending_jobs_count
  jobs.pending.count
end

#running_jobs_countInteger

Returns number of running jobs on a process.

Returns:

  • (Integer)

    number of running jobs on a process



40
41
42
# File 'lib/karafka/web/ui/models/process.rb', line 40

def running_jobs_count
  jobs.running.count
end

#subscribed?Boolean

Returns true if there are any active subscriptions, otherwise false.

Returns:

  • (Boolean)

    true if there are any active subscriptions, otherwise false.



61
62
63
64
65
66
67
# File 'lib/karafka/web/ui/models/process.rb', line 61

def subscribed?
  return false if consumer_groups.empty?

  consumer_groups.any? do |cg|
    !cg.subscription_groups.empty?
  end
end

#subscribed_partitions_countInteger

Returns number of partitions to which we are currently subscribed.

Returns:

  • (Integer)

    number of partitions to which we are currently subscribed



70
71
72
73
74
75
76
# File 'lib/karafka/web/ui/models/process.rb', line 70

def subscribed_partitions_count
  consumer_groups
    .flat_map(&:subscription_groups)
    .flat_map(&:topics)
    .flat_map(&:partitions)
    .count
end