Skip to content

Delayed Topics

Karafka's Delayed Topics is a feature that enables delaying message processing from specific topics for a specified time. It can be beneficial to delay message processing for various reasons. For example, to allow additional processing or validation time, avoid overloading the system during high-traffic periods, or provide a retry mechanism for failed messages. Delayed Topics offer greater flexibility and control over message processing.

One of the benefits of the Karafka Delayed Topic feature is that it allows for arbitrary delay without impacting the processing of other topics.

Delay is implemented by pausing the consumption of the partitions for a specified amount of time. This means that there is no explicit sleep or anything of that nature involved that would clog or impact other topics' operations, and all the available resources are free to process messages that are not expected to be delayed.

This makes the Delayed Topic feature a great choice for applications that need to delay the processing of specific messages without impacting the processing of other messages in the system. By using Karafka's built-in partition pausing mechanism, delayed messages can be processed in a way that is both efficient and reliable.

*Illustration presenting how Delayed Topics delays too young messages.

Enabling Delayed Topics

To enable the Delayed Topics feature in Karafka, you need to add the delay_by option to your Karafka routing configuration. Here's an example of how to do that:

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    topic :orders do
      consumer OrdersConsumer
      # Always delay processing messages from the orders topic by 1 minute
      delay_by(60_000)
    end
  end
end

Please keep in mind, that the delay time needs to be provided in milliseconds

Delayed Topics vs. inline #sleep invocation

Using #sleep inside consumers is not recommended. Sleep can heavily impact other topics and partitions and cause additional lags even for those, that do not invoke it. Below you can find a potential impact of sleeping on one of the topic partitions vs. using the delayed topics functionality instead:

Using #sleep on one of the partitions:

*Sleep can affect other topics and partitions running in parallel in other threads.

vs. delaying via the #delay_by API:

#delay_by is affecting only the desired topic/partition while others are processed as fast as possible.

Limitations

While the Karafka Delayed Topics feature provides a valuable way to delay message processing, it does have some limitations to keep in mind.

One significant limitation is that the delay is not always millisecond-precise. This is because the Delayed Topics feature works by pausing the processing of a given partition for a specified amount of time and then unpausing it after that time has elapsed. However, the unpausing happens before the polling happens, so there can be a slight delay between when the partition is unpaused and when the delayed message is processed.

This means that if you need millisecond-precise timing for your application, there may be better choices than Delayed Topics. However, this limitation is unlikely to be a significant issue for most use cases.

This limitation also means that messages may be delayed slightly more than the requirement minimum but will never be delayed less than expected.

Below is an example distribution of the extra lag beyond the tested and expected ten seconds. In most cases, it is equal to or less than 10% of Karafkas' max wait time.

Revocation and Shutdown

When using the Delayed Topics feature in Karafka, it is essential to note that both the #shutdown and #revocation methods may be executed without the prior #consume running. This is because Delayed Topics may delay the processing of the first set of messages, which means that the messages batch may be empty, and the first and last offsets taken from metadata will be equal to -1001.

In such scenarios, using the #used? method when relying on them in #revocation and #shutdown is always recommended. This can be done using a conditional statement that checks if there was even a single batch consumed or scheduled for consumption.

Below you can find an example scenario where a check is needed on #shutdown to verify that there are messages in the messages batch before committing the offset. This scenario occurs when the Karafka server is started and quickly shut down before the first batch of messages is old enough to be processed.

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    topic :orders do
      consumer OrdersConsumer
      delay_by(60_000)
      manual_offset_management true
    end
  end
end

class OrdersConsumer < ApplicationConsumer
  def consume
    puts messages.payloads
  end

  def shutdown
    # When using `#delay_by`, make sure that there is any message
    # we want to mark as consumed upon shutdown
    #
    # Please note, you do not need this check if you are not using
    # filtering API or delayed topics functionalities.
    return unless used?

    mark_as_consumed messages.last
  end
end

It is important to note that Delayed Topics can be a powerful tool for managing message processing, as it allows for messages to be processed in a controlled manner. However, it is essential to understand the potential side effects and implement appropriate error-handling mechanisms to ensure the system remains stable and reliable.

Example Use Cases

Here are some potential use cases for Delayed Topics:

  • General: By using Delayed Topics in conjunction with a Dead Letter Queue, you can create a more robust and dynamic system that is capable of handling a variety of failure scenarios and providing a more efficient and effective message processing system.

  • Data crawling: The Delayed Topics feature can be helpful in data crawling applications, where immediately published data may not be immediately available due to HTTP caches. In such cases, it may be beneficial to delay the processing of messages for a fixed period, to ensure that all the caches have expired and the data is fully available. By using delayed processing, you can avoid processing incomplete or stale data and ensure that your application works with the latest, fully available information.

  • E-commerce: Delay processing of orders for a short period to allow for cancellation or modification of orders by customers. During this delay, additional validation can be performed, such as stock availability or fraud detection. If there is a problem with processing an order, it can be moved to a Dead Letter Queue for reprocessing later.

  • Social Media: Delay processing of user-generated content to allow for moderation by human teams before publishing. During the delay, messages can be stored in a separate queue and notified to moderators for review. If approved, the message can be moved to the main processing queue for publishing. The message can be moved to a Dead Letter Queue for further action if not approved.

  • Finance: Delay processing of high-risk transactions to provide additional time for fraud detection. If a transaction is flagged as suspicious, it can be moved to a separate queue for human review. If a transaction fails to process due to a system error, it can be moved to a Dead Letter Queue for reprocessing at a later time.

  • Real-Time Monitoring: Delay processing of real-time monitoring data to enable better performance and reduce network congestion. By batching data and delaying processing, it can be processed more efficiently and effectively, and results can be displayed to users in a more timely manner.

  • Machine Learning: Delay processing of machine learning training data to avoid training on stale or inaccurate data. By delaying data processing, it is possible to ensure that the data used for training is up-to-date and accurate.

Summary

Karafka's Delayed Topics is a powerful feature that allows for arbitrary delays when processing messages from specific topics. It can be used in various use cases, such as e-commerce, social media moderation, and finance. By delaying message processing, you can perform additional processing or validation, moderate user-generated content, and introduce a retry mechanism for failed messages. By using Delayed Topics in conjunction with a Dead Letter Queue, you can create a more robust and dynamic system that can handle various business and failure scenarios.