Class: Karafka::Web::Ui::Models::Process

Inherits:
Lib::HashProxy show all
Defined in:
lib/karafka/web/ui/models/process.rb

Overview

Single consumer process representation

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Lib::HashProxy

#initialize, #method_missing, #respond_to_missing?, #to_h

Constructor Details

This class inherits a constructor from Karafka::Web::Ui::Lib::HashProxy

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class Karafka::Web::Ui::Lib::HashProxy

Class Method Details

.find(state, process_id) ⇒ Process

Note:

Keep in mind, that this search is looking only within processes with a compatible schema, as it uses #active under the hood.

Looks for a given process based on its id

Parameters:

  • state (State)

    state of the system based on which we will do the lookup

  • process_id (String)

    id of the process we are looking for

Returns:

  • (Process)

    selected process or error raised

Raises:



17
18
19
20
# File 'lib/karafka/web/ui/models/process.rb', line 17

def find(state, process_id)
  found_process = Processes.active(state).find { |process| process.id == process_id }
  found_process || raise(::Karafka::Web::Errors::Ui::NotFoundError, process_id)
end

Instance Method Details

#consumer_groupsArray<ConsumerGroup>

Returns consumer groups to which this process is subscribed in an alphabetical order.

Returns:

  • (Array<ConsumerGroup>)

    consumer groups to which this process is subscribed in an alphabetical order



25
26
27
28
29
30
# File 'lib/karafka/web/ui/models/process.rb', line 25

def consumer_groups
  super
    .values
    .map { |cg_hash| ConsumerGroup.new(cg_hash) }
    .sort_by(&:id)
end

#jobsArray<Job>

Jobs sorted from longest running to youngest

Returns:

  • (Array<Job>)

    current jobs of this process



34
35
36
37
38
39
# File 'lib/karafka/web/ui/models/process.rb', line 34

def jobs
  super
    .map { |job| Job.new(job) }
    .sort_by(&:updated_at)
    .then { |jobs| Jobs.new(jobs) }
end

#lag_hybridInteger

Returns collective hybrid lag on this process.

Returns:

  • (Integer)

    collective hybrid lag on this process



52
53
54
55
56
57
58
59
60
# File 'lib/karafka/web/ui/models/process.rb', line 52

def lag_hybrid
  consumer_groups
    .flat_map(&:subscription_groups)
    .flat_map(&:topics)
    .flat_map(&:partitions)
    .map(&:lag_hybrid)
    .delete_if(&:negative?)
    .sum
end

#pending_jobs_countInteger

Returns number of pending jobs on a process.

Returns:

  • (Integer)

    number of pending jobs on a process



47
48
49
# File 'lib/karafka/web/ui/models/process.rb', line 47

def pending_jobs_count
  jobs.pending.count
end

#running_jobs_countInteger

Returns number of running jobs on a process.

Returns:

  • (Integer)

    number of running jobs on a process



42
43
44
# File 'lib/karafka/web/ui/models/process.rb', line 42

def running_jobs_count
  jobs.running.count
end

#schema_compatible?Boolean

Note:

We do not differentiate between reporting older or newer against Web UI Puma instance. Any incompatibility will cause reporting of incompatible. That’s for the sake of simplicity as the long term goal for user is anyhow to align those.

Returns consumer process data is only compatible if the version of its schema matches the Web UI version of the schema. Some users have gradual deployment where they slowly rollout new versions of consumers and for this time, we want to indicate which instances have incompatible schema. This will allow us to show those processes to indicate they exist, so users are not confused with them missing, but will also block all capabilities until the full update of all components.

Returns:

  • (Boolean)

    consumer process data is only compatible if the version of its schema matches the Web UI version of the schema. Some users have gradual deployment where they slowly rollout new versions of consumers and for this time, we want to indicate which instances have incompatible schema. This will allow us to show those processes to indicate they exist, so users are not confused with them missing, but will also block all capabilities until the full update of all components.



90
91
92
# File 'lib/karafka/web/ui/models/process.rb', line 90

def schema_compatible?
  self[:schema_version] == ::Karafka::Web::Tracking::Consumers::Sampler::SCHEMA_VERSION
end

#subscribed?Boolean

Returns true if there are any active subscriptions, otherwise false.

Returns:

  • (Boolean)

    true if there are any active subscriptions, otherwise false.



63
64
65
66
67
68
69
# File 'lib/karafka/web/ui/models/process.rb', line 63

def subscribed?
  return false if consumer_groups.empty?

  consumer_groups.any? do |cg|
    !cg.subscription_groups.empty?
  end
end

#subscribed_partitions_countInteger

Returns number of partitions to which we are currently subscribed.

Returns:

  • (Integer)

    number of partitions to which we are currently subscribed



72
73
74
75
76
77
78
# File 'lib/karafka/web/ui/models/process.rb', line 72

def subscribed_partitions_count
  consumer_groups
    .flat_map(&:subscription_groups)
    .flat_map(&:topics)
    .flat_map(&:partitions)
    .count
end