Exception: Rdkafka::RdkafkaError

Inherits:
BaseError
  • Object
show all
Defined in:
lib/rdkafka/error.rb

Overview

Error returned by the underlying rdkafka library.

Direct Known Subclasses

RdkafkaTopicPartitionListError

Constant Summary collapse

EMPTY_HASH =

Empty hash for details default allocation

{}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#broker_messageString (readonly)

Error message sent by the broker

Returns:

  • (String)


22
23
24
# File 'lib/rdkafka/error.rb', line 22

def broker_message
  @broker_message
end

#detailsHash (readonly)

Optional details hash specific to a given error or empty hash if none or not supported

Returns:

  • (Hash)


26
27
28
# File 'lib/rdkafka/error.rb', line 26

def details
  @details
end

#message_prefixString (readonly)

Prefix to be used for human readable representation

Returns:

  • (String)


18
19
20
# File 'lib/rdkafka/error.rb', line 18

def message_prefix
  @message_prefix
end

#rdkafka_responseInteger (readonly)

The underlying raw error response

Returns:

  • (Integer)


14
15
16
# File 'lib/rdkafka/error.rb', line 14

def rdkafka_response
  @rdkafka_response
end

Class Method Details

.build(response_ptr_or_code, message_prefix = nil, broker_message: nil) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/rdkafka/error.rb', line 51

def build(response_ptr_or_code, message_prefix = nil, broker_message: nil)
  case response_ptr_or_code
  when Integer
    return false if response_ptr_or_code.zero?

    new(response_ptr_or_code, message_prefix, broker_message: broker_message)
  when Bindings::Message
    return false if response_ptr_or_code[:err].zero?

    unless response_ptr_or_code[:payload].null?
      message_prefix ||= response_ptr_or_code[:payload].read_string(response_ptr_or_code[:len])
    end

    details = if response_ptr_or_code[:rkt].null?
                EMPTY_HASH
              else
                {
                  partition: response_ptr_or_code[:partition],
                  offset: response_ptr_or_code[:offset],
                  topic: Bindings.rd_kafka_topic_name(response_ptr_or_code[:rkt])
                }.freeze
              end
    new(
      response_ptr_or_code[:err],
      message_prefix,
      broker_message: broker_message,
      details: details
    )
  else
    build_from_c(response_ptr_or_code, message_prefix)
  end
end

.build_from_c(response_ptr, message_prefix = nil, broker_message: nil) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rdkafka/error.rb', line 29

def build_from_c(response_ptr, message_prefix = nil, broker_message: nil)
  code = Rdkafka::Bindings.rd_kafka_error_code(response_ptr)

  return false if code.zero?

  message = broker_message || Rdkafka::Bindings.rd_kafka_err2str(code)
  fatal = !Rdkafka::Bindings.rd_kafka_error_is_fatal(response_ptr).zero?
  retryable = !Rdkafka::Bindings.rd_kafka_error_is_retriable(response_ptr).zero?
  abortable = !Rdkafka::Bindings.rd_kafka_error_txn_requires_abort(response_ptr).zero?

  Rdkafka::Bindings.rd_kafka_error_destroy(response_ptr)

  new(
    code,
    message_prefix,
    broker_message: message,
    fatal: fatal,
    retryable: retryable,
    abortable: abortable
  )
end

.validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil) ⇒ Object



84
85
86
87
# File 'lib/rdkafka/error.rb', line 84

def validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil)
  error = build(response_ptr_or_code, message_prefix, broker_message: broker_message)
  error ? raise(error) : false
end

Instance Method Details

#==(another_error) ⇒ Object

Error comparison



139
140
141
# File 'lib/rdkafka/error.rb', line 139

def ==(another_error)
   another_error.is_a?(self.class) && (self.to_s == another_error.to_s)
end

#abortable?Boolean

Returns:

  • (Boolean)


151
152
153
# File 'lib/rdkafka/error.rb', line 151

def abortable?
  @abortable
end

#codeSymbol

This error’s code, for example :partition_eof, :msg_size_too_large.

Returns:

  • (Symbol)


112
113
114
115
116
117
118
119
# File 'lib/rdkafka/error.rb', line 112

def code
  code = Rdkafka::Bindings.rd_kafka_err2name(@rdkafka_response).downcase
  if code[0] == "_"
    code[1..-1].to_sym
  else
    code.to_sym
  end
end

#fatal?Boolean

Returns:

  • (Boolean)


143
144
145
# File 'lib/rdkafka/error.rb', line 143

def fatal?
  @fatal
end

#is_partition_eof?Boolean

Whether this error indicates the partition is EOF.

Returns:

  • (Boolean)


134
135
136
# File 'lib/rdkafka/error.rb', line 134

def is_partition_eof?
  code == :partition_eof
end

#retryable?Boolean

Returns:

  • (Boolean)


147
148
149
# File 'lib/rdkafka/error.rb', line 147

def retryable?
  @retryable
end

#to_sString

Human readable representation of this error.

Returns:

  • (String)


123
124
125
126
127
128
129
130
# File 'lib/rdkafka/error.rb', line 123

def to_s
  message_prefix_part = if message_prefix
                   "#{message_prefix} - "
                 else
                   ''
                 end
  "#{message_prefix_part}#{Rdkafka::Bindings.rd_kafka_err2str(@rdkafka_response)} (#{code})"
end