Class: Rdkafka::AbstractHandle
- Inherits:
-
FFI::Struct
- Object
- FFI::Struct
- Rdkafka::AbstractHandle
- Includes:
- Helpers::Time
- Defined in:
- lib/rdkafka/abstract_handle.rb
Overview
This class serves as an abstract base class to represent handles within the Rdkafka module. As a subclass of FFI::Struct
, this class provides a blueprint for other specific handle classes to inherit from, ensuring they adhere to a particular structure and behavior.
Subclasses must define their own layout, and the layout must start with:
layout :pending, :bool, :response, :int
Direct Known Subclasses
Rdkafka::Admin::CreateAclHandle, Rdkafka::Admin::CreatePartitionsHandle, Rdkafka::Admin::CreateTopicHandle, Rdkafka::Admin::DeleteAclHandle, Rdkafka::Admin::DeleteGroupsHandle, Rdkafka::Admin::DeleteTopicHandle, Rdkafka::Admin::DescribeAclHandle, Producer::DeliveryHandle
Defined Under Namespace
Classes: WaitTimeoutError
Constant Summary collapse
- REGISTRY =
Registry for registering all the handles.
{}
Class Method Summary collapse
-
.register(handle) ⇒ Object
Adds handle to the register.
-
.remove(address) ⇒ Object
Removes handle from the register based on the handle address.
Instance Method Summary collapse
-
#create_result ⇒ Object
Operation-specific result.
-
#operation_name ⇒ String
The name of the operation (e.g. “delivery”).
-
#pending? ⇒ Boolean
Whether the handle is still pending.
-
#raise_error ⇒ Object
Allow subclasses to override.
-
#wait(max_wait_timeout: 60, wait_timeout: 0.1, raise_response_error: true) ⇒ Object
Wait for the operation to complete or raise an error if this takes longer than the timeout.
Methods included from Helpers::Time
Class Method Details
.register(handle) ⇒ Object
Adds handle to the register
22 23 24 25 |
# File 'lib/rdkafka/abstract_handle.rb', line 22 def register(handle) address = handle.to_ptr.address REGISTRY[address] = handle end |
.remove(address) ⇒ Object
Removes handle from the register based on the handle address
30 31 32 |
# File 'lib/rdkafka/abstract_handle.rb', line 30 def remove(address) REGISTRY.delete(address) end |
Instance Method Details
#create_result ⇒ Object
Returns operation-specific result.
85 86 87 |
# File 'lib/rdkafka/abstract_handle.rb', line 85 def create_result raise "Must be implemented by subclass!" end |
#operation_name ⇒ String
Returns the name of the operation (e.g. “delivery”).
80 81 82 |
# File 'lib/rdkafka/abstract_handle.rb', line 80 def operation_name raise "Must be implemented by subclass!" end |
#pending? ⇒ Boolean
Whether the handle is still pending.
39 40 41 |
# File 'lib/rdkafka/abstract_handle.rb', line 39 def pending? self[:pending] end |
#raise_error ⇒ Object
Allow subclasses to override
90 91 92 |
# File 'lib/rdkafka/abstract_handle.rb', line 90 def raise_error raise RdkafkaError.new(self[:response]) end |
#wait(max_wait_timeout: 60, wait_timeout: 0.1, raise_response_error: true) ⇒ Object
Wait for the operation to complete or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the operation failed, rdkafka might still be working on the operation. In this case it is possible to call wait again.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/rdkafka/abstract_handle.rb', line 57 def wait(max_wait_timeout: 60, wait_timeout: 0.1, raise_response_error: true) timeout = if max_wait_timeout monotonic_now + max_wait_timeout else nil end loop do if pending? if timeout && timeout <= monotonic_now raise WaitTimeoutError.new( "Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds" ) end sleep wait_timeout elsif self[:response] != 0 && raise_response_error raise_error else return create_result end end end |