Exception: Rdkafka::RdkafkaError

Inherits:
BaseError
  • Object
show all
Defined in:
lib/rdkafka/error.rb

Overview

Error returned by the underlying rdkafka library.

Direct Known Subclasses

RdkafkaTopicPartitionListError

Constant Summary collapse

EMPTY_HASH =

Empty hash for details default allocation

{}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#broker_messageString (readonly)

Error message sent by the broker

Returns:

  • (String)


22
23
24
# File 'lib/rdkafka/error.rb', line 22

def broker_message
  @broker_message
end

#detailsHash (readonly)

Optional details hash specific to a given error or empty hash if none or not supported

Returns:

  • (Hash)


26
27
28
# File 'lib/rdkafka/error.rb', line 26

def details
  @details
end

#message_prefixString (readonly)

Prefix to be used for human readable representation

Returns:

  • (String)


18
19
20
# File 'lib/rdkafka/error.rb', line 18

def message_prefix
  @message_prefix
end

#rdkafka_responseInteger (readonly)

The underlying raw error response

Returns:

  • (Integer)


14
15
16
# File 'lib/rdkafka/error.rb', line 14

def rdkafka_response
  @rdkafka_response
end

Class Method Details

.build(response_ptr_or_code, message_prefix = nil, broker_message: nil) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/rdkafka/error.rb', line 51

def build(response_ptr_or_code, message_prefix = nil, broker_message: nil)
  case response_ptr_or_code
  when Integer
    return false if response_ptr_or_code == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR

    new(response_ptr_or_code, message_prefix, broker_message: broker_message)
  when Bindings::Message
    return false if response_ptr_or_code[:err] == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR

    unless response_ptr_or_code[:payload].null?
      message_prefix ||= response_ptr_or_code[:payload].read_string(response_ptr_or_code[:len])
    end

    details = if response_ptr_or_code[:rkt].null?
                EMPTY_HASH
              else
                {
                  partition: response_ptr_or_code[:partition],
                  offset: response_ptr_or_code[:offset],
                  topic: Bindings.rd_kafka_topic_name(response_ptr_or_code[:rkt])
                }.freeze
              end
    new(
      response_ptr_or_code[:err],
      message_prefix,
      broker_message: broker_message,
      details: details
    )
  else
    build_from_c(response_ptr_or_code, message_prefix)
  end
end

.build_fatal(client_ptr, fallback_error_code: -150,, fallback_message: nil) ⇒ RdkafkaError

Build a fatal error from librdkafka’s fatal error state. Calls rd_kafka_fatal_error() to get the actual underlying error code and description.

Parameters:

  • client_ptr (FFI::Pointer)

    Pointer to rd_kafka_t client

  • fallback_error_code (Integer) (defaults to: -150,)

    Error code to use if no fatal error found (default: -150)

  • fallback_message (String, nil) (defaults to: nil)

    Message to use if no fatal error found

Returns:

  • (RdkafkaError)

    Error object with fatal flag set to true



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/rdkafka/error.rb', line 109

def build_fatal(client_ptr, fallback_error_code: -150, fallback_message: nil)
  fatal_error_details = Rdkafka::Bindings.extract_fatal_error(client_ptr)

  if fatal_error_details
    new(
      fatal_error_details[:error_code],
      broker_message: fatal_error_details[:error_string],
      fatal: true
    )
  else
    # Fallback: if extract_fatal_error returns nil (shouldn't happen in practice),
    # the error code itself still indicates a fatal condition
    new(
      fallback_error_code,
      broker_message: fallback_message,
      fatal: true
    )
  end
end

.build_from_c(response_ptr, message_prefix = nil, broker_message: nil) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rdkafka/error.rb', line 29

def build_from_c(response_ptr, message_prefix = nil, broker_message: nil)
  code = Rdkafka::Bindings.rd_kafka_error_code(response_ptr)

  return false if code == Bindings::RD_KAFKA_RESP_ERR_NO_ERROR

  message = broker_message || Rdkafka::Bindings.rd_kafka_err2str(code)
  fatal = !Rdkafka::Bindings.rd_kafka_error_is_fatal(response_ptr).zero?
  retryable = !Rdkafka::Bindings.rd_kafka_error_is_retriable(response_ptr).zero?
  abortable = !Rdkafka::Bindings.rd_kafka_error_txn_requires_abort(response_ptr).zero?

  Rdkafka::Bindings.rd_kafka_error_destroy(response_ptr)

  new(
    code,
    message_prefix,
    broker_message: message,
    fatal: fatal,
    retryable: retryable,
    abortable: abortable
  )
end

.validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil, client_ptr: nil) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/rdkafka/error.rb', line 84

def validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil, client_ptr: nil)
  error = build(response_ptr_or_code, message_prefix, broker_message: broker_message)

  return false unless error

  # Auto-detect and handle fatal errors (-150)
  if error.rdkafka_response == Bindings::RD_KAFKA_RESP_ERR__FATAL && client_ptr
    # Discover the underlying fatal error from librdkafka
    error = build_fatal(
      client_ptr,
      fallback_error_code: error.rdkafka_response,
      fallback_message: broker_message
    )
  end

  raise error
end

Instance Method Details

#==(another_error) ⇒ Object

Error comparison



186
187
188
# File 'lib/rdkafka/error.rb', line 186

def ==(another_error)
   another_error.is_a?(self.class) && (self.to_s == another_error.to_s)
end

#abortable?Boolean

Returns:

  • (Boolean)


198
199
200
# File 'lib/rdkafka/error.rb', line 198

def abortable?
  @abortable
end

#codeSymbol

This error’s code, for example :partition_eof, :msg_size_too_large.

Returns:

  • (Symbol)


152
153
154
155
156
157
158
159
# File 'lib/rdkafka/error.rb', line 152

def code
  code = Rdkafka::Bindings.rd_kafka_err2name(@rdkafka_response).downcase
  if code[0] == "_"
    code[1..-1].to_sym
  else
    code.to_sym
  end
end

#fatal?Boolean

Returns:

  • (Boolean)


190
191
192
# File 'lib/rdkafka/error.rb', line 190

def fatal?
  @fatal
end

#is_partition_eof?Boolean

Whether this error indicates the partition is EOF.

Returns:

  • (Boolean)


181
182
183
# File 'lib/rdkafka/error.rb', line 181

def is_partition_eof?
  code == :partition_eof
end

#retryable?Boolean

Returns:

  • (Boolean)


194
195
196
# File 'lib/rdkafka/error.rb', line 194

def retryable?
  @retryable
end

#to_sString

Human readable representation of this error.

Returns:

  • (String)


163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/rdkafka/error.rb', line 163

def to_s
  message_prefix_part = if message_prefix
                   "#{message_prefix} - "
                 else
                   ''
                 end

  err_str = Rdkafka::Bindings.rd_kafka_err2str(@rdkafka_response)
  base = "#{message_prefix_part}#{err_str} (#{code})"

  return base if broker_message.nil?
  return base if broker_message.empty?

  "#{base}\n#{broker_message}"
end