Class: Karafka::Web::Ui::Models::Process
- Inherits:
-
Lib::HashProxy
- Object
- Lib::HashProxy
- Karafka::Web::Ui::Models::Process
- Defined in:
- lib/karafka/web/ui/models/process.rb
Overview
Single consumer process representation
Class Method Summary collapse
-
.find(state, process_id) ⇒ Process
Looks for a given process based on its id.
Instance Method Summary collapse
-
#consumer_groups ⇒ Array<ConsumerGroup>
Consumer groups to which this process is subscribed in an alphabetical order.
-
#jobs ⇒ Array<Job>
Jobs sorted from longest running to youngest.
-
#lag_hybrid ⇒ Integer
Collective hybrid lag on this process.
-
#pending_jobs_count ⇒ Integer
Number of pending jobs on a process.
-
#running_jobs_count ⇒ Integer
Number of running jobs on a process.
-
#schema_compatible? ⇒ Boolean
Consumer process data is only compatible if the version of its schema matches the Web UI version of the schema.
-
#subscribed? ⇒ Boolean
True if there are any active subscriptions, otherwise false.
-
#subscribed_partitions_count ⇒ Integer
Number of partitions to which we are currently subscribed.
Methods inherited from Lib::HashProxy
#initialize, #method_missing, #respond_to_missing?, #to_h
Constructor Details
This class inherits a constructor from Karafka::Web::Ui::Lib::HashProxy
Dynamic Method Handling
This class handles dynamic methods through the method_missing method in the class Karafka::Web::Ui::Lib::HashProxy
Class Method Details
.find(state, process_id) ⇒ Process
Keep in mind, that this search is looking only within processes with a compatible schema, as it uses #active
under the hood.
Looks for a given process based on its id
17 18 19 20 |
# File 'lib/karafka/web/ui/models/process.rb', line 17 def find(state, process_id) found_process = Processes.active(state).find { |process| process.id == process_id } found_process || raise(::Karafka::Web::Errors::Ui::NotFoundError, process_id) end |
Instance Method Details
#consumer_groups ⇒ Array<ConsumerGroup>
Returns consumer groups to which this process is subscribed in an alphabetical order.
25 26 27 28 29 30 |
# File 'lib/karafka/web/ui/models/process.rb', line 25 def consumer_groups super .values .map { |cg_hash| ConsumerGroup.new(cg_hash) } .sort_by(&:id) end |
#jobs ⇒ Array<Job>
Jobs sorted from longest running to youngest
34 35 36 37 38 39 |
# File 'lib/karafka/web/ui/models/process.rb', line 34 def jobs super .map { |job| Job.new(job) } .sort_by(&:updated_at) .then { |jobs| Jobs.new(jobs) } end |
#lag_hybrid ⇒ Integer
Returns collective hybrid lag on this process.
52 53 54 55 56 57 58 59 60 |
# File 'lib/karafka/web/ui/models/process.rb', line 52 def lag_hybrid consumer_groups .flat_map(&:subscription_groups) .flat_map(&:topics) .flat_map(&:partitions) .map(&:lag_hybrid) .delete_if(&:negative?) .sum end |
#pending_jobs_count ⇒ Integer
Returns number of pending jobs on a process.
47 48 49 |
# File 'lib/karafka/web/ui/models/process.rb', line 47 def pending_jobs_count jobs.pending.count end |
#running_jobs_count ⇒ Integer
Returns number of running jobs on a process.
42 43 44 |
# File 'lib/karafka/web/ui/models/process.rb', line 42 def running_jobs_count jobs.running.count end |
#schema_compatible? ⇒ Boolean
We do not differentiate between reporting older or newer against Web UI Puma instance. Any incompatibility will cause reporting of incompatible. That’s for the sake of simplicity as the long term goal for user is anyhow to align those.
Returns consumer process data is only compatible if the version of its schema matches the Web UI version of the schema. Some users have gradual deployment where they slowly rollout new versions of consumers and for this time, we want to indicate which instances have incompatible schema. This will allow us to show those processes to indicate they exist, so users are not confused with them missing, but will also block all capabilities until the full update of all components.
90 91 92 |
# File 'lib/karafka/web/ui/models/process.rb', line 90 def schema_compatible? self[:schema_version] == ::Karafka::Web::Tracking::Consumers::Sampler::SCHEMA_VERSION end |
#subscribed? ⇒ Boolean
Returns true if there are any active subscriptions, otherwise false.
63 64 65 66 67 68 69 |
# File 'lib/karafka/web/ui/models/process.rb', line 63 def subscribed? return false if consumer_groups.empty? consumer_groups.any? do |cg| !cg.subscription_groups.empty? end end |
#subscribed_partitions_count ⇒ Integer
Returns number of partitions to which we are currently subscribed.
72 73 74 75 76 77 78 |
# File 'lib/karafka/web/ui/models/process.rb', line 72 def subscribed_partitions_count consumer_groups .flat_map(&:subscription_groups) .flat_map(&:topics) .flat_map(&:partitions) .count end |