Module: Karafka::Patches::Rdkafka::Bindings
- Includes:
- Rdkafka::Bindings
- Defined in:
- lib/karafka/patches/rdkafka/bindings.rb
Overview
Binding patches that slightly change how rdkafka operates in certain places
Constant Summary collapse
- RB =
Alias internally
::Rdkafka::Bindings
- RebalanceCallback =
This patch changes few things: - it commits offsets (if any) upon partition revocation, so less jobs need to be reprocessed if they are assigned to a different process - reports callback errors into the errors instrumentation instead of the logger - catches only StandardError instead of Exception as we fully control the directly executed callbacks
FFI::Function.new( :void, %i[pointer int pointer pointer] ) do |client_ptr, code, partitions_ptr, opaque_ptr| # Patch reference pr = ::Karafka::Patches::Rdkafka::Bindings tpl = ::Rdkafka::Consumer::TopicPartitionList.from_native_tpl(partitions_ptr).freeze opaque = ::Rdkafka::Config.opaques[opaque_ptr.to_i] if RB.rd_kafka_rebalance_protocol(client_ptr) == 'COOPERATIVE' pr.on_cooperative_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) else pr.on_eager_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) end end
Class Method Summary collapse
-
.on_cooperative_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) ⇒ Object
Handle assignments on cooperative rebalance.
-
.on_eager_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) ⇒ Object
Handle assignments on a eager rebalance.
Class Method Details
.on_cooperative_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) ⇒ Object
Handle assignments on cooperative rebalance
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 23 def on_cooperative_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) case code when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS opaque&.call_on_partitions_assign(tpl) RB.rd_kafka_incremental_assign(client_ptr, partitions_ptr) opaque&.call_on_partitions_assigned(tpl) when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS opaque&.call_on_partitions_revoke(tpl) RB.rd_kafka_commit(client_ptr, nil, false) RB.rd_kafka_incremental_unassign(client_ptr, partitions_ptr) opaque&.call_on_partitions_revoked(tpl) else opaque&.call_on_partitions_assign(tpl) RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL) opaque&.call_on_partitions_assigned(tpl) end end |
.on_eager_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) ⇒ Object
Handle assignments on a eager rebalance
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 48 def on_eager_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) case code when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS opaque&.call_on_partitions_assign(tpl) RB.rd_kafka_assign(client_ptr, partitions_ptr) opaque&.call_on_partitions_assigned(tpl) when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS opaque&.call_on_partitions_revoke(tpl) RB.rd_kafka_commit(client_ptr, nil, false) RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL) opaque&.call_on_partitions_revoked(tpl) else opaque&.call_on_partitions_assign(tpl) RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL) opaque&.call_on_partitions_assigned(tpl) end end |