Class: Karafka::App

Inherits:
Object
  • Object
show all
Extended by:
Setup::Dsl
Defined in:
lib/karafka/app.rb

Overview

App class

Class Method Summary collapse

Methods included from Setup::Dsl

config, setup

Class Method Details

.assignmentsHash<Karafka::Routing::Topic, Array<Integer>>

Returns current assignments of this process. Both topics and partitions

Returns:



56
57
58
# File 'lib/karafka/app.rb', line 56

def assignments
  Instrumentation::AssignmentsTracker.instance.current
end

.consumer_groupsKarafka::Routing::Builder Also known as: routes

Returns consumers builder instance alias.

Returns:



24
25
26
27
28
29
# File 'lib/karafka/app.rb', line 24

def consumer_groups
  config
    .internal
    .routing
    .builder
end

.done?Boolean

Note:

It is a meta status from the status object

Returns true if we should be done in general with processing anything.

Returns:

  • (Boolean)

    true if we should be done in general with processing anything



79
80
81
# File 'lib/karafka/app.rb', line 79

def done?
  App.config.internal.status.done?
end

.subscription_groupsHash

Returns active subscription groups grouped based on consumer group in a hash.

Returns:

  • (Hash)

    active subscription groups grouped based on consumer group in a hash



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/karafka/app.rb', line 32

def subscription_groups
  # We first build all the subscription groups, so they all get the same position, despite
  # later narrowing that. It allows us to maintain same position number for static members
  # even when we want to run subset of consumer groups or subscription groups
  #
  # We then narrow this to active consumer groups from which we select active subscription
  # groups.
  consumer_groups
    .map { |cg| [cg, cg.subscription_groups] }
    .select { |cg, _| cg.active? }
    .select { |_, sgs| sgs.delete_if { |sg| !sg.active? } }
    .delete_if { |_, sgs| sgs.empty? }
    .each { |_, sgs| sgs.each { |sg| sg.topics.delete_if { |top| !top.active? } } }
    .each { |_, sgs| sgs.delete_if { |sg| sg.topics.empty? } }
    .reject { |cg, _| cg.subscription_groups.empty? }
    .to_h
end

.warmupObject

Notifies the Ruby virtual machine that the boot sequence is finished, and that now is a good time to optimize the application. In case of older Ruby versions, runs compacting, which is part of the full warmup introduced in Ruby 3.3.



12
13
14
15
16
17
18
19
20
21
# File 'lib/karafka/app.rb', line 12

def warmup
  # Per recommendation, this should not run in children nodes
  return if Karafka::App.config.swarm.node

  monitor.instrument('app.before_warmup', caller: self)

  return GC.compact unless ::Process.respond_to?(:warmup)

  ::Process.warmup
end