Class: Karafka::BaseConsumer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Taggable
Defined in:
lib/karafka/base_consumer.rb

Overview

Base consumer from which all Karafka consumers should inherit

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBaseConsumer

Creates new consumer and assigns it an id



29
30
31
32
# File 'lib/karafka/base_consumer.rb', line 29

def initialize
  @id = SecureRandom.hex(6)
  @used = false
end

Instance Attribute Details

#clientKarafka::Connection::Client

Returns kafka connection client.

Returns:



22
23
24
# File 'lib/karafka/base_consumer.rb', line 22

def client
  @client
end

#coordinatorKarafka::Processing::Coordinator

Returns coordinator.

Returns:



24
25
26
# File 'lib/karafka/base_consumer.rb', line 24

def coordinator
  @coordinator
end

#idString (readonly)

Returns id of the current consumer.

Returns:

  • (String)

    id of the current consumer



18
19
20
# File 'lib/karafka/base_consumer.rb', line 18

def id
  @id
end

#messagesKarafka::Routing::Topic

Returns topic to which a given consumer is subscribed.

Returns:



20
21
22
# File 'lib/karafka/base_consumer.rb', line 20

def messages
  @messages
end

#producerWaterdrop::Producer

Returns producer instance.

Returns:

  • (Waterdrop::Producer)

    producer instance



26
27
28
# File 'lib/karafka/base_consumer.rb', line 26

def producer
  @producer
end

Instance Method Details

#on_after_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things but not as part of the public api.

Note:

We handle and report errors here because of flows that could fail. For example a DLQ flow could fail if it was not able to dispatch the DLQ message. Other “non-user” based flows do not interact with external systems and their errors are expected to bubble up



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/karafka/base_consumer.rb', line 89

def on_after_consume
  handle_after_consume
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: coordinator.seek_offset,
    type: 'consumer.after_consume.error'
  )

  retry_after_pause
end

#on_before_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things and not as part of the public api. This can act as a hook when creating non-blocking consumers and doing other advanced stuff

Can be used to run preparation code in the worker



51
52
53
54
55
56
57
58
# File 'lib/karafka/base_consumer.rb', line 51

def on_before_consume
  messages..processed_at = Time.now
  messages..freeze

  # We run this after the full metadata setup, so we can use all the messages information
  # if needed
  handle_before_consume
end

#on_before_schedule_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things and not as a part of the public api. This should not perform any extensive operations as it is blocking and running in the listener thread.

Can be used to run preparation code prior to the job being enqueued



40
41
42
43
# File 'lib/karafka/base_consumer.rb', line 40

def on_before_schedule_consume
  @used = true
  handle_before_schedule_consume
end

#on_before_schedule_eofedObject

Can be used to run code prior to scheduling of eofed execution



104
105
106
# File 'lib/karafka/base_consumer.rb', line 104

def on_before_schedule_eofed
  handle_before_schedule_eofed
end

#on_before_schedule_idleObject

Can be used to run code prior to scheduling of idle execution



124
125
126
# File 'lib/karafka/base_consumer.rb', line 124

def on_before_schedule_idle
  handle_before_schedule_idle
end

#on_before_schedule_revokedObject

Can be used to run code prior to scheduling of revoked execution



138
139
140
# File 'lib/karafka/base_consumer.rb', line 138

def on_before_schedule_revoked
  handle_before_schedule_revoked
end

#on_before_schedule_shutdownObject

Can be used to run code prior to scheduling of revoked execution



159
160
161
# File 'lib/karafka/base_consumer.rb', line 159

def on_before_schedule_shutdown
  handle_before_schedule_shutdown
end

#on_consumeBoolean

Note:

We keep the seek offset tracking, and use it to compensate for async offset flushing that may not yet kick in when error occurs. That way we pause always on the last processed message.

Executes the default consumer flow.

Returns:

  • (Boolean)

    true if there was no exception, otherwise false.



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/karafka/base_consumer.rb', line 69

def on_consume
  handle_consume
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: coordinator.seek_offset,
    type: 'consumer.consume.error'
  )
end

#on_eofedObject

Trigger method for running on eof without messages



109
110
111
112
113
114
115
116
117
118
119
# File 'lib/karafka/base_consumer.rb', line 109

def on_eofed
  handle_eofed
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: coordinator.seek_offset,
    type: 'consumer.eofed.error'
  )
end

#on_idleObject

Trigger method for running on idle runs without messages



131
132
133
# File 'lib/karafka/base_consumer.rb', line 131

def on_idle
  handle_idle
end

#on_revokedObject

Trigger method for running on partition revocation.



145
146
147
148
149
150
151
152
153
154
# File 'lib/karafka/base_consumer.rb', line 145

def on_revoked
  handle_revoked
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.revoked.error'
  )
end

#on_shutdownObject

Trigger method for running on shutdown.



166
167
168
169
170
171
172
173
174
175
# File 'lib/karafka/base_consumer.rb', line 166

def on_shutdown
  handle_shutdown
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.shutdown.error'
  )
end