Class: Karafka::App
- Inherits:
-
Object
- Object
- Karafka::App
- Extended by:
- Setup::Dsl
- Defined in:
- lib/karafka/app.rb
Overview
App class
Class Method Summary collapse
-
.assignments ⇒ Hash<Karafka::Routing::Topic, Array<Integer>>
Returns current assignments of this process.
-
.consumer_groups ⇒ Karafka::Routing::Builder
(also: routes)
Consumers builder instance alias.
-
.debug!(contexts = 'all') ⇒ Object
Forces the debug setup onto Karafka and default WaterDrop producer.
-
.done? ⇒ Boolean
True if we should be done in general with processing anything.
-
.subscription_groups ⇒ Hash
Active subscription groups grouped based on consumer group in a hash.
-
.warmup ⇒ Object
Notifies the Ruby virtual machine that the boot sequence is finished, and that now is a good time to optimize the application.
Methods included from Setup::Dsl
Class Method Details
.assignments ⇒ Hash<Karafka::Routing::Topic, Array<Integer>>
Returns current assignments of this process. Both topics and partitions
56 57 58 |
# File 'lib/karafka/app.rb', line 56 def assignments Instrumentation::AssignmentsTracker.instance.current end |
.consumer_groups ⇒ Karafka::Routing::Builder Also known as: routes
Returns consumers builder instance alias.
24 25 26 27 28 29 |
# File 'lib/karafka/app.rb', line 24 def consumer_groups config .internal .routing .builder end |
.debug!(contexts = 'all') ⇒ Object
Forces the debug setup onto Karafka and default WaterDrop producer. This needs to run prior to any operations that would cache state, like consuming or producing messages.
104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/karafka/app.rb', line 104 def debug!(contexts = 'all') logger.level = ::Logger::DEBUG producer.config.logger.level = ::Logger::DEBUG config.kafka[:debug] = contexts producer.config.kafka[:debug] = contexts consumer_groups.map(&:topics).flat_map(&:to_a).each do |topic| topic.kafka[:debug] = contexts end end |
.done? ⇒ Boolean
It is a meta status from the status object
Returns true if we should be done in general with processing anything.
79 80 81 |
# File 'lib/karafka/app.rb', line 79 def done? App.config.internal.status.done? end |
.subscription_groups ⇒ Hash
Returns active subscription groups grouped based on consumer group in a hash.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/karafka/app.rb', line 32 def subscription_groups # We first build all the subscription groups, so they all get the same position, despite # later narrowing that. It allows us to maintain same position number for static members # even when we want to run subset of consumer groups or subscription groups # # We then narrow this to active consumer groups from which we select active subscription # groups. consumer_groups .map { |cg| [cg, cg.subscription_groups] } .select { |cg, _| cg.active? } .select { |_, sgs| sgs.delete_if { |sg| !sg.active? } } .delete_if { |_, sgs| sgs.empty? } .each { |_, sgs| sgs.each { |sg| sg.topics.delete_if { |top| !top.active? } } } .each { |_, sgs| sgs.delete_if { |sg| sg.topics.empty? } } .reject { |cg, _| cg.subscription_groups.empty? } .to_h end |
.warmup ⇒ Object
Notifies the Ruby virtual machine that the boot sequence is finished, and that now is a good time to optimize the application. In case of older Ruby versions, runs compacting, which is part of the full warmup introduced in Ruby 3.3.
12 13 14 15 16 17 18 19 20 21 |
# File 'lib/karafka/app.rb', line 12 def warmup # Per recommendation, this should not run in children nodes return if Karafka::App.config.swarm.node monitor.instrument('app.before_warmup', caller: self) return GC.compact unless ::Process.respond_to?(:warmup) ::Process.warmup end |