Class: Rdkafka::AbstractHandle
- Inherits:
-
FFI::Struct
- Object
- FFI::Struct
- Rdkafka::AbstractHandle
- Includes:
- Helpers::Time
- Defined in:
- lib/rdkafka/abstract_handle.rb
Overview
This class serves as an abstract base class to represent handles within the Rdkafka module. As a subclass of FFI::Struct
, this class provides a blueprint for other specific handle classes to inherit from, ensuring they adhere to a particular structure and behavior.
Subclasses must define their own layout, and the layout must start with:
layout :pending, :bool, :response, :int
Direct Known Subclasses
Rdkafka::Admin::CreateAclHandle, Rdkafka::Admin::CreatePartitionsHandle, Rdkafka::Admin::CreateTopicHandle, Rdkafka::Admin::DeleteAclHandle, Rdkafka::Admin::DeleteGroupsHandle, Rdkafka::Admin::DeleteTopicHandle, Rdkafka::Admin::DescribeAclHandle, Rdkafka::Admin::DescribeConfigsHandle, Rdkafka::Admin::IncrementalAlterConfigsHandle, Producer::DeliveryHandle
Defined Under Namespace
Classes: WaitTimeoutError
Constant Summary collapse
- REGISTRY =
Registry for registering all the handles.
{}
Class Method Summary collapse
-
.register(handle) ⇒ Object
Adds handle to the register.
-
.remove(address) ⇒ Object
Removes handle from the register based on the handle address.
Instance Method Summary collapse
-
#create_result ⇒ Object
Operation-specific result.
-
#initialize ⇒ AbstractHandle
constructor
A new instance of AbstractHandle.
-
#operation_name ⇒ String
The name of the operation (e.g. “delivery”).
-
#pending? ⇒ Boolean
Whether the handle is still pending.
-
#raise_error ⇒ Object
Allow subclasses to override.
-
#unlock ⇒ Object
Unlock the resources.
-
#wait(max_wait_timeout: 60, raise_response_error: true) ⇒ Object
Wait for the operation to complete or raise an error if this takes longer than the timeout.
Methods included from Helpers::Time
Constructor Details
#initialize ⇒ AbstractHandle
Returns a new instance of AbstractHandle.
39 40 41 42 43 44 |
# File 'lib/rdkafka/abstract_handle.rb', line 39 def initialize @mutex = Thread::Mutex.new @resource = Thread::ConditionVariable.new super end |
Class Method Details
.register(handle) ⇒ Object
Adds handle to the register
26 27 28 29 |
# File 'lib/rdkafka/abstract_handle.rb', line 26 def register(handle) address = handle.to_ptr.address REGISTRY[address] = handle end |
.remove(address) ⇒ Object
Removes handle from the register based on the handle address
34 35 36 |
# File 'lib/rdkafka/abstract_handle.rb', line 34 def remove(address) REGISTRY.delete(address) end |
Instance Method Details
#create_result ⇒ Object
Returns operation-specific result.
103 104 105 |
# File 'lib/rdkafka/abstract_handle.rb', line 103 def create_result raise "Must be implemented by subclass!" end |
#operation_name ⇒ String
Returns the name of the operation (e.g. “delivery”).
98 99 100 |
# File 'lib/rdkafka/abstract_handle.rb', line 98 def operation_name raise "Must be implemented by subclass!" end |
#pending? ⇒ Boolean
Whether the handle is still pending.
49 50 51 |
# File 'lib/rdkafka/abstract_handle.rb', line 49 def pending? self[:pending] end |
#raise_error ⇒ Object
Allow subclasses to override
108 109 110 |
# File 'lib/rdkafka/abstract_handle.rb', line 108 def raise_error raise RdkafkaError.new(self[:response]) end |
#unlock ⇒ Object
Unlock the resources
90 91 92 93 94 95 |
# File 'lib/rdkafka/abstract_handle.rb', line 90 def unlock @mutex.synchronize do self[:pending] = false @resource.broadcast end end |
#wait(max_wait_timeout: 60, raise_response_error: true) ⇒ Object
Wait for the operation to complete or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the operation failed, rdkafka might still be working on the operation. In this case it is possible to call wait again.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/rdkafka/abstract_handle.rb', line 65 def wait(max_wait_timeout: 60, raise_response_error: true) timeout = max_wait_timeout ? monotonic_now + max_wait_timeout : MAX_WAIT_TIMEOUT_FOREVER @mutex.synchronize do loop do if pending? to_wait = (timeout - monotonic_now) if to_wait.positive? @resource.wait(@mutex, to_wait) else raise WaitTimeoutError.new( "Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds" ) end elsif self[:response] != 0 && raise_response_error raise_error else return create_result end end end end |