Module: Karafka::Processing::Strategies::Default
- Includes:
- Base
- Included in:
- Karafka::Pro::Processing::Strategies::Default, Dlq, Mom
- Defined in:
- lib/karafka/processing/strategies/default.rb
Overview
No features enabled. No manual offset management No long running jobs Nothing. Just standard, automatic flow
Constant Summary collapse
- FEATURES =
Apply strategy for a non-feature based flow
%i[].freeze
Instance Method Summary collapse
-
#commit_offsets(async: true) ⇒ Boolean
Triggers an async offset commit.
-
#commit_offsets! ⇒ Boolean
Triggers a synchronous offsets commit to Kafka.
-
#handle_after_consume ⇒ Object
Standard flow marks work as consumed and moves on if everything went ok.
-
#handle_before_consume ⇒ Object
Increment number of attempts.
-
#handle_consume ⇒ Object
Run the user consumption code.
-
#handle_eofed ⇒ Object
Runs the consumer
#eofed
method with reporting. -
#handle_idle ⇒ Object
Code that should run on idle runs without messages available.
-
#handle_initialized ⇒ Object
Runs the post-creation, post-assignment code.
-
#handle_revoked ⇒ Object
We need to always un-pause the processing in case we have lost a given partition.
-
#handle_shutdown ⇒ Object
Runs the shutdown code.
-
#mark_as_consumed(message) ⇒ Boolean
Marks message as consumed in an async way.
-
#mark_as_consumed!(message) ⇒ Boolean
Marks message as consumed in a sync way.
Instance Method Details
#commit_offsets(async: true) ⇒ Boolean
Due to its async nature, this may not fully represent the offset state in some edge cases (like for example going beyond max.poll.interval)
Triggers an async offset commit
94 95 96 97 98 99 100 101 102 |
# File 'lib/karafka/processing/strategies/default.rb', line 94 def commit_offsets(async: true) # Do not commit if we already lost the assignment return false if revoked? return true if client.commit_offsets(async: async) # This will once more check the librdkafka revocation status and will revoke the # coordinator in case it was not revoked revoked? end |
#commit_offsets! ⇒ Boolean
This is fully synchronous, hence the result of this can be used in DB transactions etc as a way of making sure, that we still own the partition.
Triggers a synchronous offsets commit to Kafka
109 110 111 |
# File 'lib/karafka/processing/strategies/default.rb', line 109 def commit_offsets! commit_offsets(async: false) end |
#handle_after_consume ⇒ Object
Standard flow marks work as consumed and moves on if everything went ok. If there was a processing error, we will pause and continue from the next message (next that is +1 from the last one that was successfully marked as consumed)
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/karafka/processing/strategies/default.rb', line 140 def handle_after_consume return if revoked? if coordinator.success? coordinator.pause_tracker.reset # We should not move the offset automatically when the partition was paused # If we would not do this upon a revocation during the pause time, a different process # would pick not from the place where we paused but from the offset that would be # automatically committed here return if coordinator.manual_pause? mark_as_consumed(.last) else retry_after_pause end end |
#handle_before_consume ⇒ Object
Increment number of attempts
114 115 116 |
# File 'lib/karafka/processing/strategies/default.rb', line 114 def handle_before_consume coordinator.pause_tracker.increment end |
#handle_consume ⇒ Object
Run the user consumption code
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/karafka/processing/strategies/default.rb', line 119 def handle_consume Karafka.monitor.instrument('consumer.consume', caller: self) Karafka.monitor.instrument('consumer.consumed', caller: self) do consume end # Mark job as successful coordinator.success!(self) rescue StandardError => e coordinator.failure!(self, e) # Re-raise so reported in the consumer raise e ensure # We need to decrease number of jobs that this coordinator coordinates as it has finished coordinator.decrement(:consume) end |
#handle_eofed ⇒ Object
Runs the consumer #eofed
method with reporting
166 167 168 169 170 171 172 173 |
# File 'lib/karafka/processing/strategies/default.rb', line 166 def handle_eofed Karafka.monitor.instrument('consumer.eof', caller: self) Karafka.monitor.instrument('consumer.eofed', caller: self) do eofed end ensure coordinator.decrement(:eofed) end |
#handle_idle ⇒ Object
Code that should run on idle runs without messages available
159 160 161 162 163 |
# File 'lib/karafka/processing/strategies/default.rb', line 159 def handle_idle nil ensure coordinator.decrement(:idle) end |
#handle_initialized ⇒ Object
It runs in the listener loop. Should not be used for anything heavy or with any potential errors. Mostly for initialization of states, etc.
Runs the post-creation, post-assignment code
37 38 39 40 41 42 |
# File 'lib/karafka/processing/strategies/default.rb', line 37 def handle_initialized Karafka.monitor.instrument('consumer.initialize', caller: self) Karafka.monitor.instrument('consumer.initialized', caller: self) do initialized end end |
#handle_revoked ⇒ Object
We need to always un-pause the processing in case we have lost a given partition. Otherwise the underlying librdkafka would not know we may want to continue processing and the pause could in theory last forever
178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/karafka/processing/strategies/default.rb', line 178 def handle_revoked resume coordinator.revoke Karafka.monitor.instrument('consumer.revoke', caller: self) Karafka.monitor.instrument('consumer.revoked', caller: self) do revoked end ensure coordinator.decrement(:revoked) end |
#handle_shutdown ⇒ Object
Runs the shutdown code
192 193 194 195 196 197 198 199 |
# File 'lib/karafka/processing/strategies/default.rb', line 192 def handle_shutdown Karafka.monitor.instrument('consumer.shutting_down', caller: self) Karafka.monitor.instrument('consumer.shutdown', caller: self) do shutdown end ensure coordinator.decrement(:shutdown) end |
#mark_as_consumed(message) ⇒ Boolean
We keep track of this offset in case we would mark as consumed and got error when processing another message. In case like this we do not pause on the message we’ve already processed but rather at the next one. This applies to both sync and async versions of this method.
Marks message as consumed in an async way.
54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/karafka/processing/strategies/default.rb', line 54 def mark_as_consumed() # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if coordinator.seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? return revoked? unless client.mark_as_consumed() coordinator.seek_offset = .offset + 1 true end |
#mark_as_consumed!(message) ⇒ Boolean
Marks message as consumed in a sync way.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/karafka/processing/strategies/default.rb', line 73 def mark_as_consumed!() # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if coordinator.seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? return revoked? unless client.mark_as_consumed!() coordinator.seek_offset = .offset + 1 true end |