Latency and Throughput
Introduction
Latency management is crucial for optimizing the performance of any Kafka-based system. The Karafka framework's default settings aim to balance average latency and decent throughput. However, these default settings might not be suitable for all use cases, particularly those requiring low latency or high throughput. This document is intended to guide you through the necessary configurations and best practices to manage latency effectively in your Karafka setup.
The producer and consumer sides of a Kafka system have distinct roles and, consequently, require different strategies for latency management. For producers, the focus is often on optimizing the message batching and sending mechanisms to minimize delays. On the consumer side, the goal is to reduce the time it takes to fetch and process messages. This document will detail specific configurations and practices for producers and consumers to help you achieve the desired latency levels.
Understanding the Complexity of Latency and Throughput Management
Latency management in Karafka is a complex and multifaceted topic. While this document provides a comprehensive overview of latency management techniques and best practices, it only covers some possible aspects. Many factors influence latency, including framework configuration, message types, scale, and the nature of your processing workload. These factors often extend beyond the scope of framework documentation.
If you are seeking in-depth assistance or facing persistent latency issues, we recommend exploring our commercial offerings, which include, among other things, high-performance features, support, and consultancy services.
Producer Management
Managing latency and throughput on the producer side is critical for ensuring that messages are sent efficiently and quickly. This section covers the key aspects of producer latency management, including configuration tuning, batching and compression, asynchronous sending, and best practices.
Configuration Tuning
WaterDrop (Karafka's message producer) is fundamentally asynchronous, with a synchronous API around it. Any message you wait for will ultimately be dispatched from a background thread. Several settings impact how WaterDrop dispatches these messages.
queue.buffering.max.ms
The queue.buffering.max.ms
parameter defines the maximum waiting time for additional messages before sending the current batch to the broker. Setting this to 0
or a very low value (like 1
ms) can help lower latency by minimizing wait times. However, this might increase overhead due to smaller batch sizes, impacting throughput and resource usage. When set to 0
, messages are dispatched immediately, allowing for sub-millisecond synchronous dispatches at the cost of increased CPU and network usage and lower throughput.
Below are logs from dispatches with queue.buffering.max.ms
set to 5
ms compared to queue.buffering.max.ms
set to 0 in a local Kafka setup:
# buffer for 5ms and produce sync
[waterdrop-80ad817ceb09] Sync producing (...) took 5.458219051361084 ms
[waterdrop-80ad817ceb09] Sync producing (...) took 5.43287992477417 ms
[waterdrop-80ad817ceb09] Sync producing (...) took 5.429641962051392 ms
[waterdrop-80ad817ceb09] Sync producing (...) took 5.377591848373413 ms
[waterdrop-80ad817ceb09] Sync producing (...) took 5.311408996582031 ms
[waterdrop-80ad817ceb09] Sync producing (...) took 5.407937049865723 ms
[waterdrop-80ad817ceb09] Sync producing (...) took 5.422562122344971 ms
[waterdrop-80ad817ceb09] Sync producing (...) took 5.36993408203125 ms
[waterdrop-80ad817ceb09] Sync producing (...) took 5.342764139175415 ms
[waterdrop-80ad817ceb09] Sync producing (...) took 5.406744956970215 ms
vs.
# dispatch immediately
[waterdrop-e2c291b6b0f3] Sync producing (...) took 0.2399919033050537 ms
[waterdrop-e2c291b6b0f3] Sync producing (...) took 0.28732800483703613 ms
[waterdrop-e2c291b6b0f3] Sync producing (...) took 0.2857530117034912 ms
[waterdrop-e2c291b6b0f3] Sync producing (...) took 0.28434205055236816 ms
[waterdrop-e2c291b6b0f3] Sync producing (...) took 0.26418089866638184 ms
[waterdrop-e2c291b6b0f3] Sync producing (...) took 0.279433012008667 ms
[waterdrop-e2c291b6b0f3] Sync producing (...) took 0.2934098243713379 ms
[waterdrop-e2c291b6b0f3] Sync producing (...) took 0.2789499759674072 ms
[waterdrop-e2c291b6b0f3] Sync producing (...) took 0.3075120449066162 ms
[waterdrop-e2c291b6b0f3] Sync producing (...) took 0.24221014976501465 ms
batch.size
The batch.size
parameter determines the maximum size of a batch of messages. Larger batch sizes can improve throughput but might increase latency. Finding the right balance is key to optimal performance.
compression.codec
Compression reduces the size of the messages sent to the broker, potentially decreasing network latency and increasing throughput. However, while compression can reduce network latency, it introduces CPU overhead. Test different compression settings to find the best balance for your use case.
request.required.acks
The request.required.acks
parameter determines the number of acknowledgments the leader broker must receive from an in-sync replica (ISR) brokers before responding to the producer. This setting is crucial for balancing message durability and producer latency.
-
0 (No Acknowledgement): The broker does not respond to the producer, and the message is considered delivered the moment it is dispatched. This setting provides the lowest latency since the producer doesn't wait for acknowledgments. It's highly performant, allowing for rapid message dispatch, but it risks data loss because there's no confirmation that the broker received the message. Suitable for non-critical data where speed is crucial.
-
1 (Leader Acknowledgement): The leader broker responds once it has written the message to its log. This setting balances latency and durability, providing a quick acknowledgment while ensuring the leader broker logs the message. It offers a good trade-off for most use cases, ensuring reasonable reliability with moderate latency.
-
-1 or all (All ISR Acknowledgements): The leader broker waits until all in-sync replicas have acknowledged the message. This setting ensures maximum durability, minimizing the risk of data loss as the message is replicated across multiple brokers. However, it results in higher latency and reduced throughput. It is best for critical applications where data integrity is essential.
Choosing the right acks setting depends on your application's requirements:
-
Low Latency Needs: Use acks=0 for the fastest message dispatch at the expense of reliability.
-
Balanced Approach: Use acks=1 to compromise performance and reliability, suitable for most applications.
-
High Durability Needs: Use acks=-1 or acks=all for critical applications where data loss is unacceptable despite the higher latency and reduced throughput.
Fine-Tuning acks
with WaterDrop Variants
Using variants, you can customize request.required.acks
within the same producer instance. This feature allows different configuration settings per topic while sharing TCP connections, optimizing producer efficiency.
socket.nagle.disable
The socket.nagle.disable
parameter in librdkafka controls the use of the Nagle algorithm for Kafka broker connections. The Nagle algorithm is a TCP optimization that reduces the number of small packets sent over the network by combining them into larger packets. While this can improve network efficiency, it also introduces latency as small messages wait to be sent together.
-
true
: Disables the Nagle algorithm, ensuring messages are sent immediately without waiting to combine them into larger packets. This setting can significantly reduce latency, particularly in scenarios with many small messages, but may increase network overhead due to more frequent transmissions. -
false
: Enables the Nagle algorithm, which can reduce the number of packets sent by aggregating smaller messages. This setting may improve overall throughput by reducing network load but at the cost of slightly higher latency.
When configuring socket.nagle.disable
, consider your application's priorities:
-
Low Latency Needs: Set
socket.nagle.disable
totrue
to minimize latency, ensuring that messages are sent as quickly as possible. -
Throughput Optimization: Set
socket.nagle.disable
tofalse
to leverage the Nagle algorithm for reduced network traffic and improved throughput, if your application can tolerate the slight increase in latency.
Here's an example configuration:
class App < Karafka::App
setup do |config|
config.kafka = {
'bootstrap.servers': 'localhost:9092',
'socket.nagle.disable': true
}
config.shutdown_timeout = 60_000
config.max_wait_time = 5_000 # Default max_wait_time for all topics
config.max_messages = 50 # Default max_messages for all topics
end
end
Asynchronous Producing
Asynchronous dispatch reduces waiting time for acknowledgments, allowing your application to continue processing tasks immediately. This significantly lowers end-to-end latency within your application and increases WaterDrop throughput.
Asynchronous producing is the recommended way for any non-critical messages.
producer.produce_many_async(
[
{ topic: 'my-topic', payload: 'my message'},
{ topic: 'my-topic', payload: 'my message'}
]
)
Consumer Management
Tuning producers may seem straightforward, but managing consumers is a different and more complex matter. Understanding and optimizing consumer latency and throughput requires a deep dive into various aspects spanning several book chapters. The sections below should be viewed as an introduction rather than exhaustive documentation.
Consumer management is influenced by numerous external factors that go beyond the framework itself. These include:
-
Types of Data Consumed: Different data types may require different processing strategies and resources.
-
Nature of Assignments and Infrastructure: How partitions are assigned to consumers and the underlying infrastructure significantly impact performance.
-
Type of Processing: Whether CPU-intensive or IO-intensive affects how consumers should be tuned.
-
Amount and Sizes of Data: The volume and size of consumed data are crucial in determining the optimal consumer configuration.
-
Number of Worker Threads: The number of threads available for processing can influence how effectively data is consumed.
-
Routing Setup: The complexity of the routing setup affect consumer performance.
-
Data Production Patterns: Steady data streams versus sudden bursts (flushes) require different handling strategies to maintain performance.
-
Topics Configuration and Number of Partitions: The configuration of topics and the number of partitions can significantly impact workload distribution and overall consumer efficiency.
This document is designed to provide you with a solid foundation in consumer latency management. It offers detailed explanations and descriptions that can serve as a basis for making informed decisions about consumer configurations. While it does not cover every possible scenario, it provides essential insights to help you navigate the complexities of consumer management in your Kafka setup, ensuring you are well-prepared for any situation.
The strategy and methods selected for consumer management can vary significantly depending on whether the priority is throughput or latency.
-
Prioritizing Throughput: When the goal is to maximize throughput, the focus is on efficiently processing large volumes of data. Strategies include increasing batch sizes, optimizing worker thread counts, and ensuring consumers can handle high loads without frequent pauses. This approach might accept higher latency for processing more messages per unit of time.
-
Prioritizing Latency: This strategy, when low latency is the priority, is a swift approach that minimizes the time taken for each message to be processed and acknowledged. It involves reducing batch sizes, using faster data processing methods, and ensuring that the consumer system is highly responsive. Here, throughput might be sacrificed, but the assurance of quick message processing is maintained.
No Silver Bullet for Latency and Throughput Tuning
There is no one-size-fits-all solution when it comes to tuning latency and throughput. As mentioned above, achieving optimal performance requires deeply understanding your specific expectations and use cases.
Scope of this Guide
This document focuses on aspects related to the operational flow of a single subscription group within Karafka. It provides guidance on tuning configurations and managing latency and throughput for individual subscription groups. However, it's important to note that system dynamics can differ significantly when dealing with multi-subscription group operations. The interplay between multiple groups, their configurations, and the shared resources can introduce additional complexities and considerations not covered in this document.
Prerequisites and Initial References
To effectively understand Karafka consumer processes latency and the topics discussed in this document, it is essential to be familiar with several key concepts and operations within the framework. Here is a list of topics you should be accustomed to:
- Routing DSL including multi-consumer group and multi-subscription group operations.
- Concurrency and Multithreading design of the framework.
- Error Handling (Especially Backoff Policies)
- Offset Management Strategies
- Monitoring and Logging basics.
- Web UI Usage for monitoring.
- Swarm/Multi-Process Mode
- Connection Multiplexing
- Virtual Partitions
Latency Types
When working with Karafka, there are two primary types of latency: consumption latency and processing latency. Understanding these will help you optimize performance and ensure timely message processing.
-
Consumption Latency: Measures the time from when a message is created to when it is consumed.
- Importance: Indicates the real-time performance of your consumer setup. High consumption latency suggests bottlenecks in message processing.
- Impact of Processing Lag: Any increase in processing lag directly affects consumption latency. If the consumer process is overloaded, jobs will wait longer in the internal queue, increasing consumption lag.
-
Processing Latency: Measures the time a batch of messages waits before being picked up by a worker.
- Importance: Helps identify internal delays within the consumer process. High processing latency can indicate an overloaded system.
- Relation to Consumption Latency: Increased processing latency will also increase consumption latency, as delays in processing lead to longer overall time in the system.
You will most often focus on consumption latency, as it reflects the overall performance and responsiveness of your consumer setup. Monitoring both consumption and processing latency helps identify and address performance issues, ensuring the efficient operation of your Karafka consumers.
Configuration Tuning
Tuning consumer configurations in Karafka involves adjusting various settings that impact how messages are consumed and processed. These settings help balance latency, throughput, and resource utilization according to your requirements. Below are the primary settings within the Karafka configuration and important librdkafka-specific settings that you must consider.
Optimizing Data Fetching
Tuning the configurations below helps improve how fast or how much data Karafka can fetch from Kafka in a given time frame, as long as the consumer process is not blocked by processing or other factors. These are the primary ways to control latency and throughput when getting data from Kafka for the Ruby process.
Further sections also contain information on how to deal with lags caused by the processing phase.
Setting | Description | Tuning Tips | Default |
---|---|---|---|
max_wait_time
|
Maximum time a consumer will wait for messages before delegating them for processing. |
|
1000 ms (Karafka) |
max_messages
|
Maximum number of messages a consumer processes in a single batch. |
|
100 (Karafka) |
fetch.wait.max.ms
|
Maximum time the consumer waits for the broker to fill the fetch request with fetch.min.bytes worth of messages. |
|
500 ms (librdkafka) |
fetch.min.bytes
|
Minimum amount of data the broker should return for a fetch request. |
|
1 byte (librdkafka) |
fetch.message.max.bytes
|
Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. |
|
1048576 bytes (1 MB) (librdkafka) |
fetch.error.backoff.ms
|
Wait time before retrying a fetch request in case of an error. |
|
500 ms (librdkafka) |
enable.partition.eof
|
Enables the consumer to raise an event when it reaches the end of a partition. |
|
false (librdkafka) |
queued.max.messages.kbytes
|
Maximum number of kilobytes of queued pre-fetched messages in the local consumer queue. |
|
65536 KB (65 MB) (librdkafka) |
Below is an example latency impact of enabling enable.partition.eof
and lowering fetch.wait.max.ms
to 100
ms, compared to the default values, on a low-traffic topic (1 message per second). In both cases, max_wait_time
was set to 2000
ms (less is better).
*Latency in ms on low-volume topic with different configurations (less is better).
The presented example illustrates how big of an impact latency configuration can have. Proper configuration tuning can tremendously impact the Karafka data ingestion patterns.
Per Topic Configuration
In Karafka, you can configure settings per topic. This allows you to tailor the configuration to the specific needs of different topics, optimizing for various use cases and workloads. However, it's important to understand the implications of such configurations.
When reconfiguring settings per topic, Karafka will create a distinct subscription group and an independent connection to Kafka for each topic with altered non-default settings. This isolation ensures that the specific configurations are applied correctly but also means that these topics will be managed independently, which can impact resource usage and system behavior.
class App < Karafka::App
setup do |config|
config.kafka = {
'bootstrap.servers': 'localhost:9092',
'max.poll.interval.ms': 300_000,
}
config.shutdown_timeout = 60_000
config.max_wait_time = 5_000 # Default max_wait_time for all topics
config.max_messages = 50 # Default max_messages for all topics
end
routes.draw do
# This topic will use only defaults
topic :default_topic do
consumer DefaultConsumer
end
# This topic will use defaults that were not overwritten and special kafka level settings
topic :custom_topic do
consumer CustomConsumer
# Custom max_messages for this topic
max_messages 200
# It is important to remember that defaults are **not** merged, so things like
# `bootstrap.servers` need to be provided again
kafka(
'bootstrap.servers': 'localhost:9092',
'fetch.wait.max.ms': 200,
'queued.max.messages.kbytes': 50_000
)
end
end
end
Parallel Processing
Aside from the fast polling of data from Kafka, Karafka optimizes the processing phase to reduce latency by processing more data in parallel. Even when data is in the in-process buffer, latency increases if it cannot be processed promptly. Karafka leverages native Ruby threads and supports multiple concurrency features to handle processing efficiently.
Polling-Related Factors Affecting Parallel Processing
Various polling-related factors can impact Karafka's ability to distribute and process obtained data in parallel. In some scenarios, the nature of the data or how it is polled from Kafka may prevent or reduce Karafka's ability to effectively distribute and process work in parallel. It's important to consider these factors when configuring and tuning your Karafka setup to ensure optimal performance.
Below, you can find a table summarizing the key aspects of Karafka's parallel processing capabilities, along with detailed descriptions and tips for optimizing latency and throughput:
Aspect | Details | Tips |
---|---|---|
Concurrent Processing of Messages |
Karafka uses multiple threads to process messages concurrently.
|
|
Consumer Group Management | Each consumer group is managed by a separate thread allowing for efficient data prefetching and buffering. |
|
Swarm Mode for Enhanced Concurrency | Forks independent processes to optimize CPU utilization, leveraging Ruby's Copy-On-Write (CoW) mechanism. It may enhance throughput and scalability by distributing the workload across multiple CPU cores. |
|
Subscription Groups for Kafka Connections | Karafka organizes topics into subscription groups to manage Kafka connections efficiently. Each subscription group operates in a separate background thread, sharing the worker pool for processing. |
|
Virtual Partitions | The Virtual Partitions feature allows for parallel processing of data from a single partition. |
|
Below is an example latency impact of enabling Virtual Partitions compared to the default values on a partition with 10 messages per second and an IO cost of around 150ms per message. In both cases, the same settings are used.
*Latency in ms on a partition with and without Virtual Partitions enabled (same traffic) with around 150ms of IO per message.
The presented example illustrates a growing latency due to the non-virtualized processing's inability to keep up with the steady pace of messages because of the IO cost. Work is distributed across multiple threads in the case of Virtual Partitions so the system can keep up.
Prefetching Messages
Karafka is designed to prefetch data from Kafka while previously obtained messages are being processed. This prefetching occurs independently for each subscription group, ensuring continuous data flow as long as more data is available in Kafka. Prefetching behavior is governed by several settings mentioned in previous sections, such as queued.max.messages.kbytes
, fetch.wait.max.ms
, fetch.min.bytes
, and fetch.message.max.bytes
.
Prefetching Behavior
Normal Operations
Under normal operations, when there are no significant lags, Karafka prefetches some data from each assigned topic partition. This is because there is little data ahead to process. For example, if 200 messages are available for processing from 10 partitions, Karafka might prefetch these messages in small batches, resulting in 10 independent jobs with 20 messages each.
*This example illustrates the work distribution of prefetched data coming from two partitions.
Data Spikes and Lags
The situation changes when Karafka experiences data spikes or when a significant amount of data is ahead. In such cases, Karafka will prefetch data in larger batches, especially with default settings. This can reduce Karafka's ability to parallelize work, except when using virtual partitions. Karafka may prefetch large chunks of data from a single topic partition during these periods. For instance, it may prefetch 500 messages from one partition, resulting in a single job with 200 messages from that partition, thus limiting parallel processing.
This behavior is driven by the need to process data quickly. Still, it can lead to reduced parallelism when large batches from a single partition dominate the internal buffer.
*This example illustrates the lack of work distribution of prefetched data when batch comes from a single partition.
Optimizing Prefetching for Parallelism
To mitigate the impact of large batch prefetching under lag conditions and to enhance system utilization, consider the following strategies:
-
Prefetching Configuration Tuning: Adjust prefetching-related settings such as
queued.max.messages.kbytes
,fetch.wait.max.ms
,fetch.min.bytes
, andfetch.message.max.bytes
to fine-tune the balance between throughput and latency. Tailoring these settings to your specific workload can optimize prefetching behavior. -
Multiple Subscription Groups: Configure multiple subscription groups to ensure that data from independent partitions (across one or many topics) is prefetching and processing independently. This setup can enhance system utilization, reduce latency, and increase throughput by allowing Karafka to handle more partitions concurrently.
-
Connection Multiplexing: Use connection multiplexing to create multiple connections for a single subscription group. This enables Karafka to independently prefetch data from different partitions, improving parallel processing capabilities.
-
Virtual Partitions: Implement virtual partitions to parallelize processing within a single partition, maintaining high throughput even under lag conditions.
Prefetching with Single Partition Assignments
If a Karafka consumer process is assigned only one topic partition, the prefetching behavior is straightforward and consistently fetches data from that single partition. In such cases, there are no concerns about parallelism or the need to distribute the processing load across multiple partitions or subscription groups. Your processing can still greatly benefit by using Virtual Partitions.
Conclusion
Understanding and configuring the prefetching behavior in Karafka is crucial for optimizing performance, especially under varying data loads. By adjusting settings and utilizing strategies like multiple subscription groups and connection multiplexing, you can enhance Karafka's ability to parallelize work, reduce latency, and increase throughput. This ensures that your Karafka deployment remains efficient and responsive, even during data spikes and lags recovery.
Subscription Group Blocking Polls
Karafka is designed to prebuffer data to ensure efficient message processing. Still, it's important to understand that this prefetched data is not utilized until all jobs based on data from the previous batch poll are completed. This behavior is by design and is a common characteristic of Kafka processors, not just in Ruby.
This approach prevents race conditions and ensures data consistency. The poll operation in Kafka acts as a heartbeat mechanism governed by the max.poll.interval.ms
setting. This interval defines the maximum delay between poll invocations before the consumer is considered dead, triggering a rebalance. By ensuring that all jobs from the previous poll are finished before new data is used, Karafka maintains data integrity and avoids processing the same message multiple times.
Impact of Uneven Work Distribution
When a subscription group fetches data from multiple topics with uneven workloads (one topic with a lot of data to process and one with less), the smaller topic can experience increased latency. This happens because the consumer will only start processing new data after completing all jobs from the previous poll, including the large dataset. Consequently, messages from the less busy topic must wait until the more intensive processing is finished.
Similarly, uneven work distribution with virtual partitions can lead to similar latency issues. If the partition key or reducer used for virtual partitions is not well balanced, certain virtual partitions may have significantly more data to process than others. This imbalance causes delays, as the consumer will wait for all data from the more loaded partitions to be processed before moving on to the next batch.
Recommendations for Mitigating Latency Issues
To address these issues, consider the following strategies:
-
Multiplexing: Use connection multiplexing to create multiple connections for a single subscription group. This allows Karafka to fetch and process data from different partitions independently, improving parallel processing and reducing latency.
-
Multiple Subscription Groups: Configure multiple subscription groups to distribute the workload more evenly. By isolating topics with significantly different workloads into separate subscription groups, you can ensure that heavy processing on one topic does not delay processing on another.
-
Monitoring and Tuning Work Distribution: Regularly monitor the performance and work distribution of your virtual partitions. Ensure that your partition key and reducer are well-balanced to avoid uneven workloads. Fine-tuning these elements can help maintain efficient and timely processing across all partitions.
Summary
Managing consumers in Karafka involves numerous internal and external factors. Each case presents unique challenges, requiring a tailored approach to optimization.
Consumer management is influenced by data types, infrastructure, processing nature (CPU vs. IO-intensive), data volume, and worker threads. Key Karafka settings like max_wait_time
, max_messages
, and fetch.wait.max.ms
play a crucial role in data fetching and processing efficiency.
External factors, such as infrastructure setup, network conditions, and data production patterns, can significantly impact performance. To stay ahead of these potential issues, it's crucial to emphasize the need for regular monitoring of consumption and processing latency. This practice is key to identifying bottlenecks and ensuring the system's responsiveness.
Optimization strategies, including multiple subscription groups, connection multiplexing, and virtual partitions, help balance workloads and enhance parallel processing. Each use case demands a unique configuration, underscoring the need for a thorough understanding of the framework and application requirements.
In conclusion, effective consumer management in Karafka requires considering various factors and regular adjustments to maintain efficiency and responsiveness.