Class: Karafka::BaseConsumer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Taggable
Defined in:
lib/karafka/base_consumer.rb

Overview

Base consumer from which all Karafka consumers should inherit

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBaseConsumer

Creates new consumer and assigns it an id



34
35
36
37
# File 'lib/karafka/base_consumer.rb', line 34

def initialize
  @id = SecureRandom.hex(6)
  @used = false
end

Instance Attribute Details

#clientKarafka::Connection::Client

Returns kafka connection client.

Returns:



27
28
29
# File 'lib/karafka/base_consumer.rb', line 27

def client
  @client
end

#coordinatorKarafka::Processing::Coordinator

Returns coordinator.

Returns:



29
30
31
# File 'lib/karafka/base_consumer.rb', line 29

def coordinator
  @coordinator
end

#idString (readonly)

Returns id of the current consumer.

Returns:

  • (String)

    id of the current consumer



23
24
25
# File 'lib/karafka/base_consumer.rb', line 23

def id
  @id
end

#messagesKarafka::Routing::Topic

Returns topic to which a given consumer is subscribed.

Returns:



25
26
27
# File 'lib/karafka/base_consumer.rb', line 25

def messages
  @messages
end

#producerWaterdrop::Producer

Returns producer instance.

Returns:

  • (Waterdrop::Producer)

    producer instance



31
32
33
# File 'lib/karafka/base_consumer.rb', line 31

def producer
  @producer
end

Instance Method Details

#on_after_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things but not as part of the public api.

Note:

We handle and report errors here because of flows that could fail. For example a DLQ flow could fail if it was not able to dispatch the DLQ message. Other “non-user” based flows do not interact with external systems and their errors are expected to bubble up



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/karafka/base_consumer.rb', line 125

def on_after_consume
  handle_after_consume
rescue StandardError => e
  monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: seek_offset,
    type: 'consumer.after_consume.error'
  )

  retry_after_pause
end

#on_before_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things and not as part of the public api. This can act as a hook when creating non-blocking consumers and doing other advanced stuff

Can be used to run preparation code in the worker



70
71
72
73
74
75
76
77
# File 'lib/karafka/base_consumer.rb', line 70

def on_before_consume
  messages..processed_at = Time.now
  messages..freeze

  # We run this after the full metadata setup, so we can use all the messages information
  # if needed
  handle_before_consume
end

#on_before_schedule_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things and not as a part of the public api. This should not perform any extensive operations as it is blocking and running in the listener thread.

Can be used to run preparation code prior to the job being enqueued



59
60
61
62
# File 'lib/karafka/base_consumer.rb', line 59

def on_before_schedule_consume
  @used = true
  handle_before_schedule_consume
end

#on_before_schedule_eofedObject

Can be used to run code prior to scheduling of eofed execution



140
141
142
# File 'lib/karafka/base_consumer.rb', line 140

def on_before_schedule_eofed
  handle_before_schedule_eofed
end

#on_before_schedule_idleObject

Can be used to run code prior to scheduling of idle execution



160
161
162
# File 'lib/karafka/base_consumer.rb', line 160

def on_before_schedule_idle
  handle_before_schedule_idle
end

#on_before_schedule_revokedObject

Can be used to run code prior to scheduling of revoked execution



174
175
176
# File 'lib/karafka/base_consumer.rb', line 174

def on_before_schedule_revoked
  handle_before_schedule_revoked
end

#on_before_schedule_shutdownObject

Can be used to run code prior to scheduling of revoked execution



195
196
197
# File 'lib/karafka/base_consumer.rb', line 195

def on_before_schedule_shutdown
  handle_before_schedule_shutdown
end

#on_consumeBoolean

Note:

We keep the seek offset tracking, and use it to compensate for async offset flushing that may not yet kick in when error occurs. That way we pause always on the last processed message.

Executes the default consumer flow.

Returns:

  • (Boolean)

    true if there was no exception, otherwise false.



105
106
107
108
109
110
111
112
113
114
115
# File 'lib/karafka/base_consumer.rb', line 105

def on_consume
  handle_consume
rescue StandardError => e
  monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: seek_offset,
    type: 'consumer.consume.error'
  )
end

#on_eofedObject

Trigger method for running on eof without messages



145
146
147
148
149
150
151
152
153
154
155
# File 'lib/karafka/base_consumer.rb', line 145

def on_eofed
  handle_eofed
rescue StandardError => e
  monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: seek_offset,
    type: 'consumer.eofed.error'
  )
end

#on_idleObject

Trigger method for running on idle runs without messages



167
168
169
# File 'lib/karafka/base_consumer.rb', line 167

def on_idle
  handle_idle
end

#on_initializedObject

Trigger method running after consumer is fully initialized.



42
43
44
45
46
47
48
49
50
51
# File 'lib/karafka/base_consumer.rb', line 42

def on_initialized
  handle_initialized
rescue StandardError => e
  monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.initialized.error'
  )
end

#on_revokedObject

Trigger method for running on partition revocation.



181
182
183
184
185
186
187
188
189
190
# File 'lib/karafka/base_consumer.rb', line 181

def on_revoked
  handle_revoked
rescue StandardError => e
  monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.revoked.error'
  )
end

#on_shutdownObject

Trigger method for running on shutdown.



202
203
204
205
206
207
208
209
210
211
# File 'lib/karafka/base_consumer.rb', line 202

def on_shutdown
  handle_shutdown
rescue StandardError => e
  monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.shutdown.error'
  )
end

#on_wrap(action, &block) ⇒ Object

Executes the default wrapping flow

Parameters:

  • action (Symbol)
  • block (Proc)


85
86
87
88
89
90
91
92
93
94
# File 'lib/karafka/base_consumer.rb', line 85

def on_wrap(action, &block)
  handle_wrap(action, &block)
rescue StandardError => e
  monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.wrap.error'
  )
end