Class: Karafka::BaseConsumer
- Inherits:
-
Object
- Object
- Karafka::BaseConsumer
- Extended by:
- Forwardable
- Includes:
- Core::Taggable
- Defined in:
- lib/karafka/base_consumer.rb
Overview
Base consumer from which all Karafka consumers should inherit
Direct Known Subclasses
ActiveJob::Consumer, Pro::RecurringTasks::Consumer, Pro::ScheduledMessages::Consumer
Instance Attribute Summary collapse
-
#client ⇒ Karafka::Connection::Client
Kafka connection client.
-
#coordinator ⇒ Karafka::Processing::Coordinator
Coordinator.
-
#id ⇒ String
readonly
Id of the current consumer.
-
#messages ⇒ Karafka::Routing::Topic
Topic to which a given consumer is subscribed.
-
#producer ⇒ Waterdrop::Producer
Producer instance.
Instance Method Summary collapse
-
#initialize ⇒ BaseConsumer
constructor
Creates new consumer and assigns it an id.
- #on_after_consume ⇒ Object
-
#on_before_consume ⇒ Object
Can be used to run preparation code in the worker.
-
#on_before_schedule_consume ⇒ Object
Can be used to run preparation code prior to the job being enqueued.
-
#on_before_schedule_eofed ⇒ Object
Can be used to run code prior to scheduling of eofed execution.
-
#on_before_schedule_idle ⇒ Object
Can be used to run code prior to scheduling of idle execution.
-
#on_before_schedule_revoked ⇒ Object
Can be used to run code prior to scheduling of revoked execution.
-
#on_before_schedule_shutdown ⇒ Object
Can be used to run code prior to scheduling of revoked execution.
-
#on_consume ⇒ Boolean
Executes the default consumer flow.
-
#on_eofed ⇒ Object
Trigger method for running on eof without messages.
-
#on_idle ⇒ Object
Trigger method for running on idle runs without messages.
-
#on_initialized ⇒ Object
Trigger method running after consumer is fully initialized.
-
#on_revoked ⇒ Object
Trigger method for running on partition revocation.
-
#on_shutdown ⇒ Object
Trigger method for running on shutdown.
Constructor Details
#initialize ⇒ BaseConsumer
Creates new consumer and assigns it an id
31 32 33 34 |
# File 'lib/karafka/base_consumer.rb', line 31 def initialize @id = SecureRandom.hex(6) @used = false end |
Instance Attribute Details
#client ⇒ Karafka::Connection::Client
Returns kafka connection client.
24 25 26 |
# File 'lib/karafka/base_consumer.rb', line 24 def client @client end |
#coordinator ⇒ Karafka::Processing::Coordinator
Returns coordinator.
26 27 28 |
# File 'lib/karafka/base_consumer.rb', line 26 def coordinator @coordinator end |
#id ⇒ String (readonly)
Returns id of the current consumer.
20 21 22 |
# File 'lib/karafka/base_consumer.rb', line 20 def id @id end |
#messages ⇒ Karafka::Routing::Topic
Returns topic to which a given consumer is subscribed.
22 23 24 |
# File 'lib/karafka/base_consumer.rb', line 22 def @messages end |
#producer ⇒ Waterdrop::Producer
Returns producer instance.
28 29 30 |
# File 'lib/karafka/base_consumer.rb', line 28 def producer @producer end |
Instance Method Details
#on_after_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things but not as part of the public api.
We handle and report errors here because of flows that could fail. For example a DLQ flow could fail if it was not able to dispatch the DLQ message. Other “non-user” based flows do not interact with external systems and their errors are expected to bubble up
105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/karafka/base_consumer.rb', line 105 def on_after_consume handle_after_consume rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, seek_offset: coordinator.seek_offset, type: 'consumer.after_consume.error' ) retry_after_pause end |
#on_before_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things and not as part of the public api. This can act as a hook when creating non-blocking consumers and doing other advanced stuff
Can be used to run preparation code in the worker
67 68 69 70 71 72 73 74 |
# File 'lib/karafka/base_consumer.rb', line 67 def on_before_consume ..processed_at = Time.now ..freeze # We run this after the full metadata setup, so we can use all the messages information # if needed handle_before_consume end |
#on_before_schedule_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things and not as a part of the public api. This should not perform any extensive operations as it is blocking and running in the listener thread.
Can be used to run preparation code prior to the job being enqueued
56 57 58 59 |
# File 'lib/karafka/base_consumer.rb', line 56 def on_before_schedule_consume @used = true handle_before_schedule_consume end |
#on_before_schedule_eofed ⇒ Object
Can be used to run code prior to scheduling of eofed execution
120 121 122 |
# File 'lib/karafka/base_consumer.rb', line 120 def on_before_schedule_eofed handle_before_schedule_eofed end |
#on_before_schedule_idle ⇒ Object
Can be used to run code prior to scheduling of idle execution
140 141 142 |
# File 'lib/karafka/base_consumer.rb', line 140 def on_before_schedule_idle handle_before_schedule_idle end |
#on_before_schedule_revoked ⇒ Object
Can be used to run code prior to scheduling of revoked execution
154 155 156 |
# File 'lib/karafka/base_consumer.rb', line 154 def on_before_schedule_revoked handle_before_schedule_revoked end |
#on_before_schedule_shutdown ⇒ Object
Can be used to run code prior to scheduling of revoked execution
175 176 177 |
# File 'lib/karafka/base_consumer.rb', line 175 def on_before_schedule_shutdown handle_before_schedule_shutdown end |
#on_consume ⇒ Boolean
We keep the seek offset tracking, and use it to compensate for async offset flushing that may not yet kick in when error occurs. That way we pause always on the last processed message.
Executes the default consumer flow.
85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/karafka/base_consumer.rb', line 85 def on_consume handle_consume rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, seek_offset: coordinator.seek_offset, type: 'consumer.consume.error' ) end |
#on_eofed ⇒ Object
Trigger method for running on eof without messages
125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/karafka/base_consumer.rb', line 125 def on_eofed handle_eofed rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, seek_offset: coordinator.seek_offset, type: 'consumer.eofed.error' ) end |
#on_idle ⇒ Object
Trigger method for running on idle runs without messages
147 148 149 |
# File 'lib/karafka/base_consumer.rb', line 147 def on_idle handle_idle end |
#on_initialized ⇒ Object
Trigger method running after consumer is fully initialized.
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/karafka/base_consumer.rb', line 39 def on_initialized handle_initialized rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.initialized.error' ) end |
#on_revoked ⇒ Object
Trigger method for running on partition revocation.
161 162 163 164 165 166 167 168 169 170 |
# File 'lib/karafka/base_consumer.rb', line 161 def on_revoked handle_revoked rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.revoked.error' ) end |
#on_shutdown ⇒ Object
Trigger method for running on shutdown.
182 183 184 185 186 187 188 189 190 191 |
# File 'lib/karafka/base_consumer.rb', line 182 def on_shutdown handle_shutdown rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.shutdown.error' ) end |