When working with Kafka, there is a setting called
It is the maximum delay between invocations of
poll() commands. This places an upper bound on the time the consumer can wait before fetching more records.
After exceeding this time, an error will be raised, the process will be removed from the group, and you may notice the following message:
Maximum poll interval (300000ms) exceeded by 255ms (adjust max.poll.interval.ms for long-running message processing): leaving group
This value is effectively the maximum time you can spend processing messages fetched in a single
poll even if they come from different partitions. Once this is exceeded, the given process will be removed from the group. This can cause the group to become unstable due to frequent rebalances.
*Standard processing flow requires all the data to be processed before polling another batch of messages.
To mitigate this, you can do a few things:
- Extend the
- Use Virtual Partitions to parallelize the work further.
- Use Long-Running Jobs and not worry about that.
The strategy you want to go with heavily depends on your data and processing patterns. If you encounter this issue while maintaining a sane number of messages and decent
max.poll.interval.ms, you should further parallelize the processing or enable this feature.
Long-Running Jobs feature follows the Confluent recommended strategy of pausing a given partition for the time of the processing and resuming processing of the partition once the work is done.
That way, as long as no rebalances occur during the processing that would cause the partition to be revoked from the given process, polling can happen within the boundaries of
*With Long-Running Job enabled, the given partition is paused for the processing time, and polling happens independently from processing.
This feature is great for scenarios where your processing may last for a longer time period. For example, when you need to communicate with external systems, their performance periodically is not deterministic.
Using Long-Running Jobs
The only thing you need to add to your setup is the
long_running_job option in your routing section:
class KarafkaApp < Karafka::App setup do |config| # ... end routes.draw do topic :orders_states do consumer OrdersStatesConsumer long_running_job true end end end
Processing during revocation
Upon a group rebalance, there are three scenarios affecting the paused partition you are processing:
- Partition is not revoked because
cooperative-stickyassignment strategy is in use.
- Partition is revoked and re-assigned to the same process.
- Partition is revoked and assigned to a different process.
#revoked? method value changed independently from the workers' occupation. This means that the revocation status will be updated even if all the workers are busy processing long-running jobs.
Note: Revocation jobs are also non-blocking for long-running jobs. If the internal workers' batch is full, they will not block polling.
cooperative-sticky assignment strategy is recommended when using Long-Running Jobs. This can increase overall stability by not triggering revocation of partitions upon rebalances when partition would be re-assigned back:
setup_karafka do |config| config.kafka[:'partition.assignment.strategy'] = 'cooperative-sticky' end
Revocation and re-assignment
In the case of scenario
2, there is nothing you need to do. Karafka will continue processing your messages and resume partition after it is done with the work.
Revocation without re-assignment
If partition becomes assigned to a different process, this process will pick up the same messages you are currently working with. To mitigate this, Karafka has a
#revoked? method you can periodically check to ensure that a given process still owns the partition you are working with.
This method, in the case of the Long-Running Jobs feature, does not require marking messages as consumed or taking any other actions. Group state is updated asynchronously alongside the work being done.
def consume messages.each do |message| # Stop sending messages to the external service if we no longer own the partition return if revoked? ExternalSystemDispatcher.new.call(message) end end
Processing during shutdown
Karafka will wait for your Long-Running Jobs to finish within the limits of
shutdown_timeout. Either set it to a value big enough for the jobs to finish or implement periodic shutdown checks and enable manual offset management. Otherwise, Karafka may forcefully stop workers in the middle of processing after the
shutdown_timeout is exceeded.
During the shutdown, polling occurs, so there is no risk of exceeding the
# Note that for this to work, you need to manage offsets yourself # Otherwise, automatic offset management will commit offset of the last message def consume messages.each do |message| # Stop sending messages if the app is stopping return if Karafka::App.stopping? ExternalSystemDispatcher.new.call(message) mark_message_as_consumed(message) end end
Using Long-Running Jobs alongside regular jobs in the same subscription group
By default, Long-Running Jobs defined alongside regular jobs will be grouped in a single subscription group. This means they will share an underlying connection to Kafka and be subject to the same blocking polling limitations.
In case of a regular job blocking beyond
max.poll.interval.ms, Kafka will revoke the regular jobs and the defined Long-Running Jobs.
If you expect that your regular jobs within the same subscription group may cause Kafka rebalances or any other issues, separating them into different subscription groups is worth doing. This will ensure that external factors do not influence Long-Running Jobs's stability.
class KarafkaApp < Karafka::App setup do |config| # ... end routes.draw do # By providing this, all the long running jobs will get a separate Kafka connection that # won't be affected by other topics consumption in any way subscription_group 'long_running_jobs' do topic :orders_states do consumer OrdersStatesConsumer long_running_job true end end topic :deliveries_states do consumer DeliviersStatesConsumer end end end
Pausing in Long-Running Jobs
When using Karafka Long-Running Jobs, it is not recommended to use manual pausing because it can lead to unexpected behaviors and errors in the system. Long-Running Jobs automatically pause and resume topic partitions based on the consumption flow. In case of a manual pause that operates for a shorter duration than the consumer processing time, the partition may be resumed before the consumer finishes its processing, which can cause unexpected behaviors and errors in the system.
If a manual pause is needed, it is recommended to compute its duration based on the following formula:
max_remaining_processing_time + 2 * max_wait_time
This will ensure that the consumer has enough time to process all the messages in the batch before the partition is resumed.
Overall, it is crucial to be mindful of the potential risks and issues associated with manual pausing when using Karafka Long-Running Jobs. By following best practices and leveraging the built-in features of the framework, we can ensure that the system remains reliable, scalable, and performs as expected.
External Service Calls: In some cases, processing messages may require making HTTP requests to external services, which can take a long time to complete. For example, processing messages to perform payment processing, geocoding, or weather data retrieval may require making requests to external services that can take a significant amount of time to return a response.
IoT Data Processing: With the rise of the Internet of Things (IoT), data processing and analysis of IoT-generated data has become increasingly important. The processing of messages may involve analyzing sensor data, predicting equipment failure, and optimizing operations.
Complex Database Operations: Performing complex database operations such as joins, aggregations, or subqueries can take a significant amount of time, especially when dealing with large datasets. The processing of messages may involve performing such operations on the incoming data.
Data Cleaning and Preprocessing: Data cleaning and preprocessing can take significant time, especially when dealing with large datasets. The processing of messages may involve tasks such as data validation, data normalization, or data standardization.
These are just a few examples of how Long-Running Jobs can benefit different industries where processing messages takes a significant amount of time. Karafka's Long-Running Jobs feature can be used to develop and manage these jobs, enabling continuous data processing and analysis.