Class: Karafka::Web::Ui::Models::Process
- Inherits:
-
Lib::HashProxy
- Object
- Lib::HashProxy
- Karafka::Web::Ui::Models::Process
- Defined in:
- lib/karafka/web/ui/models/process.rb
Overview
Single consumer process representation
Class Method Summary collapse
-
.find(state, process_id) ⇒ Process
Looks for a given process based on its id.
Instance Method Summary collapse
-
#consumer_groups ⇒ Array<ConsumerGroup>
Consumer groups to which this process is subscribed in an alphabetical order.
-
#id ⇒ String
Process id without the name and ip.
-
#jobs ⇒ Array<Job>
Jobs sorted from longest running to youngest.
-
#lag_hybrid ⇒ Integer
Collective hybrid lag on this process.
-
#subscribed? ⇒ Boolean
True if there are any active subscriptions, otherwise false.
-
#subscribed_partitions_count ⇒ Integer
Number of partitions to which we are currently subscribed.
Methods inherited from Lib::HashProxy
#initialize, #method_missing, #respond_to_missing?, #to_h
Constructor Details
This class inherits a constructor from Karafka::Web::Ui::Lib::HashProxy
Dynamic Method Handling
This class handles dynamic methods through the method_missing method in the class Karafka::Web::Ui::Lib::HashProxy
Class Method Details
.find(state, process_id) ⇒ Process
Looks for a given process based on its id
15 16 17 18 |
# File 'lib/karafka/web/ui/models/process.rb', line 15 def find(state, process_id) found_process = Processes.active(state).find { |process| process.id == process_id } found_process || raise(::Karafka::Web::Errors::Ui::NotFoundError, process_id) end |
Instance Method Details
#consumer_groups ⇒ Array<ConsumerGroup>
Returns consumer groups to which this process is subscribed in an alphabetical order.
28 29 30 31 32 33 |
# File 'lib/karafka/web/ui/models/process.rb', line 28 def consumer_groups super .values .map { |cg_hash| ConsumerGroup.new(cg_hash) } .sort_by(&:id) end |
#id ⇒ String
Returns process id without the name and ip.
22 23 24 |
# File 'lib/karafka/web/ui/models/process.rb', line 22 def id @id ||= name.split(':').last end |
#jobs ⇒ Array<Job>
Jobs sorted from longest running to youngest
37 38 39 40 41 42 |
# File 'lib/karafka/web/ui/models/process.rb', line 37 def jobs super .map { |job| Job.new(job) } .sort_by(&:updated_at) .then { |jobs| Jobs.new(jobs) } end |
#lag_hybrid ⇒ Integer
Returns collective hybrid lag on this process.
45 46 47 48 49 50 51 52 53 |
# File 'lib/karafka/web/ui/models/process.rb', line 45 def lag_hybrid consumer_groups .flat_map(&:subscription_groups) .flat_map(&:topics) .flat_map(&:partitions) .map(&:lag_hybrid) .delete_if(&:negative?) .sum end |
#subscribed? ⇒ Boolean
Returns true if there are any active subscriptions, otherwise false.
56 57 58 59 60 61 62 |
# File 'lib/karafka/web/ui/models/process.rb', line 56 def subscribed? return false if consumer_groups.empty? consumer_groups.any? do |cg| !cg.subscription_groups.empty? end end |
#subscribed_partitions_count ⇒ Integer
Returns number of partitions to which we are currently subscribed.
65 66 67 68 69 70 71 |
# File 'lib/karafka/web/ui/models/process.rb', line 65 def subscribed_partitions_count consumer_groups .flat_map(&:subscription_groups) .flat_map(&:topics) .flat_map(&:partitions) .count end |