Exception: Rdkafka::RdkafkaError
- Defined in:
- lib/rdkafka/error.rb
Overview
Error returned by the underlying rdkafka library.
Direct Known Subclasses
Constant Summary collapse
- EMPTY_HASH =
Empty hash for details default allocation
{}.freeze
Instance Attribute Summary collapse
-
#broker_message ⇒ String
readonly
Error message sent by the broker.
-
#details ⇒ Hash
readonly
Optional details hash specific to a given error or empty hash if none or not supported.
-
#message_prefix ⇒ String
readonly
Prefix to be used for human readable representation.
-
#rdkafka_response ⇒ Integer
readonly
The underlying raw error response.
Class Method Summary collapse
-
.build(response_ptr_or_code, message_prefix = nil, broker_message: nil) ⇒ Object
-
.build_fatal(client_ptr, fallback_error_code: -150,, fallback_message: nil) ⇒ RdkafkaError
Build a fatal error from librdkafka’s fatal error state.
-
.build_from_c(response_ptr, message_prefix = nil, broker_message: nil) ⇒ Object
-
.validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil, client_ptr: nil) ⇒ Object
Instance Method Summary collapse
-
#==(another_error) ⇒ Object
Error comparison.
-
#abortable? ⇒ Boolean
-
#code ⇒ Symbol
This error’s code, for example
:partition_eof,:msg_size_too_large. -
#fatal? ⇒ Boolean
-
#is_partition_eof? ⇒ Boolean
Whether this error indicates the partition is EOF.
-
#retryable? ⇒ Boolean
-
#to_s ⇒ String
Human readable representation of this error.
Instance Attribute Details
#broker_message ⇒ String (readonly)
Error message sent by the broker
22 23 24 |
# File 'lib/rdkafka/error.rb', line 22 def @broker_message end |
#details ⇒ Hash (readonly)
Optional details hash specific to a given error or empty hash if none or not supported
26 27 28 |
# File 'lib/rdkafka/error.rb', line 26 def details @details end |
#message_prefix ⇒ String (readonly)
Prefix to be used for human readable representation
18 19 20 |
# File 'lib/rdkafka/error.rb', line 18 def @message_prefix end |
#rdkafka_response ⇒ Integer (readonly)
The underlying raw error response
14 15 16 |
# File 'lib/rdkafka/error.rb', line 14 def rdkafka_response @rdkafka_response end |
Class Method Details
.build(response_ptr_or_code, message_prefix = nil, broker_message: nil) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/rdkafka/error.rb', line 51 def build(response_ptr_or_code, = nil, broker_message: nil) case response_ptr_or_code when Integer return false if response_ptr_or_code == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR new(response_ptr_or_code, , broker_message: ) when Bindings::Message return false if response_ptr_or_code[:err] == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR unless response_ptr_or_code[:payload].null? ||= response_ptr_or_code[:payload].read_string(response_ptr_or_code[:len]) end details = if response_ptr_or_code[:rkt].null? EMPTY_HASH else { partition: response_ptr_or_code[:partition], offset: response_ptr_or_code[:offset], topic: Bindings.rd_kafka_topic_name(response_ptr_or_code[:rkt]) }.freeze end new( response_ptr_or_code[:err], , broker_message: , details: details ) else build_from_c(response_ptr_or_code, ) end end |
.build_fatal(client_ptr, fallback_error_code: -150,, fallback_message: nil) ⇒ RdkafkaError
Build a fatal error from librdkafka’s fatal error state. Calls rd_kafka_fatal_error() to get the actual underlying error code and description.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/rdkafka/error.rb', line 109 def build_fatal(client_ptr, fallback_error_code: -150, fallback_message: nil) fatal_error_details = Rdkafka::Bindings.extract_fatal_error(client_ptr) if fatal_error_details new( fatal_error_details[:error_code], broker_message: fatal_error_details[:error_string], fatal: true ) else # Fallback: if extract_fatal_error returns nil (shouldn't happen in practice), # the error code itself still indicates a fatal condition new( fallback_error_code, broker_message: , fatal: true ) end end |
.build_from_c(response_ptr, message_prefix = nil, broker_message: nil) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/rdkafka/error.rb', line 29 def build_from_c(response_ptr, = nil, broker_message: nil) code = Rdkafka::Bindings.rd_kafka_error_code(response_ptr) return false if code == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR = || Rdkafka::Bindings.rd_kafka_err2str(code) fatal = !Rdkafka::Bindings.rd_kafka_error_is_fatal(response_ptr).zero? retryable = !Rdkafka::Bindings.rd_kafka_error_is_retriable(response_ptr).zero? abortable = !Rdkafka::Bindings.rd_kafka_error_txn_requires_abort(response_ptr).zero? Rdkafka::Bindings.rd_kafka_error_destroy(response_ptr) new( code, , broker_message: , fatal: fatal, retryable: retryable, abortable: abortable ) end |
.validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil, client_ptr: nil) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/rdkafka/error.rb', line 84 def validate!(response_ptr_or_code, = nil, broker_message: nil, client_ptr: nil) error = build(response_ptr_or_code, , broker_message: ) return false unless error # Auto-detect and handle fatal errors (-150) if error.rdkafka_response == Bindings::RD_KAFKA_RESP_ERR__FATAL && client_ptr # Discover the underlying fatal error from librdkafka error = build_fatal( client_ptr, fallback_error_code: error.rdkafka_response, fallback_message: ) end raise error end |
Instance Method Details
#==(another_error) ⇒ Object
Error comparison
186 187 188 |
# File 'lib/rdkafka/error.rb', line 186 def ==(another_error) another_error.is_a?(self.class) && (self.to_s == another_error.to_s) end |
#abortable? ⇒ Boolean
198 199 200 |
# File 'lib/rdkafka/error.rb', line 198 def abortable? @abortable end |
#code ⇒ Symbol
This error’s code, for example :partition_eof, :msg_size_too_large.
152 153 154 155 156 157 158 159 |
# File 'lib/rdkafka/error.rb', line 152 def code code = Rdkafka::Bindings.rd_kafka_err2name(@rdkafka_response).downcase if code[0] == "_" code[1..-1].to_sym else code.to_sym end end |
#fatal? ⇒ Boolean
190 191 192 |
# File 'lib/rdkafka/error.rb', line 190 def fatal? @fatal end |
#is_partition_eof? ⇒ Boolean
Whether this error indicates the partition is EOF.
181 182 183 |
# File 'lib/rdkafka/error.rb', line 181 def is_partition_eof? code == :partition_eof end |
#retryable? ⇒ Boolean
194 195 196 |
# File 'lib/rdkafka/error.rb', line 194 def retryable? @retryable end |
#to_s ⇒ String
Human readable representation of this error.
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/rdkafka/error.rb', line 163 def to_s = if "#{} - " else '' end err_str = Rdkafka::Bindings.rd_kafka_err2str(@rdkafka_response) base = "#{}#{err_str} (#{code})" return base if .nil? return base if .empty? "#{base}\n#{}" end |