Module: Karafka::Pro::Routing::Features::RecurringTasks::Builder
- Included in:
- Proxy
- Defined in:
- lib/karafka/pro/routing/features/recurring_tasks/builder.rb
Overview
Routing extensions for recurring tasks
Instance Method Summary collapse
-
#ensure_fugit_availability! ⇒ Object
Checks if fugit is present.
-
#recurring_tasks(active = false, &block) ⇒ Object
Enabled recurring tasks operations and adds needed topics and other stuff.
Instance Method Details
#ensure_fugit_availability! ⇒ Object
Checks if fugit is present. If not, will try to require it as it might not have been required but is available. If fails, will crash.
105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/karafka/pro/routing/features/recurring_tasks/builder.rb', line 105 def ensure_fugit_availability! return if Object.const_defined?(:Fugit) require 'fugit' rescue LoadError raise( ::Karafka::Errors::DependencyConstraintsError, <<~ERROR_MSG Failed to require fugit gem. Add it to your Gemfile, as it is required for the recurring tasks to work. ERROR_MSG ) end |
#recurring_tasks(active = false, &block) ⇒ Object
Enabled recurring tasks operations and adds needed topics and other stuff.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/karafka/pro/routing/features/recurring_tasks/builder.rb', line 18 def recurring_tasks(active = false, &block) return unless active # We only require zlib when we decide to run recurring tasks because it is not needed # otherwise. require 'zlib' ensure_fugit_availability! tasks_cfg = App.config.recurring_tasks topics_cfg = tasks_cfg.topics consumer_group tasks_cfg.group_id do # Registers the primary topic that we use to control schedules execution. This is # the one that we use to trigger recurring tasks. schedules_topic = topic(topics_cfg.schedules) do consumer tasks_cfg.consumer_class deserializer tasks_cfg.deserializer # Because the topic method name as well as builder proxy method name is the same # we need to reference it via target directly target.recurring_tasks(true) # We manage offsets directly because messages can have both schedules and # commands and we need to apply them only when we need to manual_offset_management(true) # We use multi-batch operations and in-memory state for schedules. This needs to # always operate without re-creation. consumer_persistence(true) # This needs to be enabled for the eof to work correctly kafka('enable.partition.eof': true, inherit: true) eofed(true) # Favour latency. This is a low traffic topic that only accepts user initiated # low-frequency commands (1) # Since we always react on the received message, we can block for longer periods # of time max_wait_time(10_000) # Since the execution of particular tasks is isolated and guarded, it should not # leak. This means, that this is to handle errors like schedule version # incompatibility and other errors that will not go away without a redeployment pause_timeout(60 * 1_000) pause_max_timeout(60 * 1_000) # Keep older data for a day and compact to the last state available config( 'cleanup.policy': 'compact,delete', 'retention.ms': 86_400_000 ) # This is the core of execution. Since we're producers of states, we need a way # to tick without having new data periodic( interval: App.config.recurring_tasks.interval, during_pause: false, during_retry: false ) # If this is the direct schedules redefinition style, we run it # The second one (see end of this method) allows for linear reconfiguration of # both the topics instance_eval(&block) if block && block.arity.zero? end # This topic is to store logs that we can then inspect either from the admin or via # the Web UI logs_topic = topic(topics_cfg.logs) do active(false) deserializer tasks_cfg.deserializer target.recurring_tasks(true) # Keep cron logs of executions for a week and after that remove. Week should be # enough and should not produce too much data. config( 'cleanup.policy': 'delete', 'retention.ms': 604_800_000 ) end yield(schedules_topic, logs_topic) if block && block.arity.positive? end end |