Producing messages
It's quite common when using Kafka to treat applications as parts of a bigger pipeline (similarly to Bash pipeline) and forward processing results to other applications. Karafka provides a way of dealing with that by allowing you to use the WaterDrop messages producer from any place within your application.
You can access the pre-initialized WaterDrop producer instance using the Karafka.producer
method from any place within your codebase.
Karafka.producer.produce_async(
topic: 'events',
payload: Events.last.to_json
)
WaterDrop is thread-safe and operates well in scale.
If you want to produce messages from the Karafka consumers, there's a handy alias method #producer
for this:
class VisitsConsumer < ApplicationConsumer
def consume
::Visit.insert_all(messages.payloads)
producer.produce_async(
topic: 'events',
payload: { type: 'inserted', count: messages.count }.to_json
)
end
end
Please follow the WaterDrop README for more details on how to use it.