Exception: Rdkafka::RdkafkaError

Inherits:
BaseError
  • Object
show all
Defined in:
lib/rdkafka/error.rb

Overview

Error returned by the underlying rdkafka library.

Direct Known Subclasses

RdkafkaTopicPartitionListError

Constant Summary collapse

EMPTY_HASH =

Empty hash for details default allocation

{}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#broker_messageString (readonly)

Error message sent by the broker

Returns:

  • (String)

22
23
24
# File 'lib/rdkafka/error.rb', line 22

def broker_message
  @broker_message
end

#detailsHash (readonly)

Optional details hash specific to a given error or empty hash if none or not supported

Returns:

  • (Hash)

26
27
28
# File 'lib/rdkafka/error.rb', line 26

def details
  @details
end

#message_prefixString (readonly)

Prefix to be used for human readable representation

Returns:

  • (String)

18
19
20
# File 'lib/rdkafka/error.rb', line 18

def message_prefix
  @message_prefix
end

#rdkafka_responseInteger (readonly)

The underlying raw error response

Returns:

  • (Integer)

14
15
16
# File 'lib/rdkafka/error.rb', line 14

def rdkafka_response
  @rdkafka_response
end

Class Method Details

.build(response_ptr_or_code, message_prefix = nil, broker_message: nil) ⇒ Object

[View source]

51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/rdkafka/error.rb', line 51

def build(response_ptr_or_code, message_prefix = nil, broker_message: nil)
  case response_ptr_or_code
  when Integer
    return false if response_ptr_or_code.zero?

    new(response_ptr_or_code, message_prefix, broker_message: broker_message)
  when Bindings::Message
    return false if response_ptr_or_code[:err].zero?

    unless response_ptr_or_code[:payload].null?
      message_prefix ||= response_ptr_or_code[:payload].read_string(response_ptr_or_code[:len])
    end

    details = if response_ptr_or_code[:rkt].null?
                EMPTY_HASH
              else
                {
                  partition: response_ptr_or_code[:partition],
                  offset: response_ptr_or_code[:offset],
                  topic: Bindings.rd_kafka_topic_name(response_ptr_or_code[:rkt])
                }.freeze
              end
    new(
      response_ptr_or_code[:err],
      message_prefix,
      broker_message: broker_message,
      details: details
    )
  else
    build_from_c(response_ptr_or_code, message_prefix)
  end
end

.build_from_c(response_ptr, message_prefix = nil, broker_message: nil) ⇒ Object

[View source]

29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rdkafka/error.rb', line 29

def build_from_c(response_ptr, message_prefix = nil, broker_message: nil)
  code = Rdkafka::Bindings.rd_kafka_error_code(response_ptr)

  return false if code.zero?

  message = broker_message || Rdkafka::Bindings.rd_kafka_err2str(code)
  fatal = !Rdkafka::Bindings.rd_kafka_error_is_fatal(response_ptr).zero?
  retryable = !Rdkafka::Bindings.rd_kafka_error_is_retriable(response_ptr).zero?
  abortable = !Rdkafka::Bindings.rd_kafka_error_txn_requires_abort(response_ptr).zero?

  Rdkafka::Bindings.rd_kafka_error_destroy(response_ptr)

  new(
    code,
    message_prefix,
    broker_message: message,
    fatal: fatal,
    retryable: retryable,
    abortable: abortable
  )
end

.validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil) ⇒ Object

[View source]

84
85
86
87
# File 'lib/rdkafka/error.rb', line 84

def validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil)
  error = build(response_ptr_or_code, message_prefix, broker_message: broker_message)
  error ? raise(error) : false
end

Instance Method Details

#==(another_error) ⇒ Object

Error comparison

[View source]

146
147
148
# File 'lib/rdkafka/error.rb', line 146

def ==(another_error)
   another_error.is_a?(self.class) && (self.to_s == another_error.to_s)
end

#abortable?Boolean

Returns:

  • (Boolean)
[View source]

158
159
160
# File 'lib/rdkafka/error.rb', line 158

def abortable?
  @abortable
end

#codeSymbol

This error’s code, for example :partition_eof, :msg_size_too_large.

Returns:

  • (Symbol)
[View source]

112
113
114
115
116
117
118
119
# File 'lib/rdkafka/error.rb', line 112

def code
  code = Rdkafka::Bindings.rd_kafka_err2name(@rdkafka_response).downcase
  if code[0] == "_"
    code[1..-1].to_sym
  else
    code.to_sym
  end
end

#fatal?Boolean

Returns:

  • (Boolean)
[View source]

150
151
152
# File 'lib/rdkafka/error.rb', line 150

def fatal?
  @fatal
end

#is_partition_eof?Boolean

Whether this error indicates the partition is EOF.

Returns:

  • (Boolean)
[View source]

141
142
143
# File 'lib/rdkafka/error.rb', line 141

def is_partition_eof?
  code == :partition_eof
end

#retryable?Boolean

Returns:

  • (Boolean)
[View source]

154
155
156
# File 'lib/rdkafka/error.rb', line 154

def retryable?
  @retryable
end

#to_sString

Human readable representation of this error.

Returns:

  • (String)
[View source]

123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/rdkafka/error.rb', line 123

def to_s
  message_prefix_part = if message_prefix
                   "#{message_prefix} - "
                 else
                   ''
                 end

  err_str = Rdkafka::Bindings.rd_kafka_err2str(@rdkafka_response)
  base = "#{message_prefix_part}#{err_str} (#{code})"

  return base if broker_message.nil?
  return base if broker_message.empty?

  "#{base}\n#{broker_message}"
end