Saturday 14 February 2015

Delay or Scheduled Message Delivery with RabbitMQ

In this tutorial,i will show you the logic of Delay or Scheduled message delivery with RabbitMQ. And  how to implement it through java. For Real world example/usecase of RabbitMQ,please go through this article.

Sometimes you don’t want messages in the queue to be read or delivered immediately. For example, while processing fax message if it fail due to network error,then no meaning of immediate retry,hence delay in this type of scenario will be useful.Fortunately, RabbitMQ 2.8+ introduced Dead Letter Exchanges (DLX), which allows us to simulate message scheduling.

In practice, if we wanted to enable retry on failure every 3 minutes, the flow would look like this:
  1. Create Work Queue and bind it to Work Exchange.
  2. Create Delay Queue and bind it to Delay Exchange.
  3. Set x-dead-letter-exchange to Work Exchange.
  4. Set x-message-ttl to 180000 ms (3 minutes) to Delay Queue .
  5. Publish message to Work Queue.
  6. Client reads message from Work Queue and attempts to process it.
  7. If the message processing fails and client publishes to Delay Queue.
  8. Messages stays in Delay Queue for 3 minutes.
  9. When message ttl expires, it is re-queued to Work Queue via Work Exchange for another attempt at processing.
  10. Repeat steps 4-7


Create the Work Queue:

private String WORK_QUEUE = "WorkQueue";
private String WORK_EXCHANGE = "WorkExchange"; 

//Create your connection factory for getting connection and channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//declare Work Exchange and Work Queue,finally bind Work Queue to Work Exchange 
channel.exchangeDeclare(WORK_EXCHANGE, "direct", true);
channel.queueDeclare(WORK_QUEUE, true, false, false, null);
channel.queueBind(WORK_QUEUE, WORK_EXCHANGE,"RK", null);


Create the Delay Queue:

private String DELAY_QUEUE ="DelayQueue";
private String DELAY_EXCHANGE = "DelayExchange";

//Make Delay Queue's Dead Letter Exchange to Work Exchange,so that after message ttl expires the message are sent to Work Queue via Work Exchange.

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", WORK_EXCHANGE);
args.put("x-message-ttl", 180000);
//declare Delay  Exchange and Delay Queue,finally bind Delay Queue to Delay Exchange 
channel.exchangeDeclare(DELAY_EXCHANGE, direct, true);
channel.queueDeclare(DELAY_QUEUE, true, false, false,args);
channel.queueBind(DELAY_QUEUE, DELAY_EXCHANGE, "RK", null);


Read from Work Queue:

QueueingConsumer consumer = new QueueingConsumer(channel);
 channel.basicConsume(WORK_QUEUE, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
         if (!processSomething(message)) {
        processLater(message);
    }
   }


Publish to Delay Queue on message processing failure:

String message = new String(delivery.getBody());
channel.basicPublish(DELAY_EXCHANGE, "", null, message.getBytes());


Notes:
It's worth mentioning that Delay Queue mechanishm gurantee that the message will be delivered atleast after the delay time,but not exactly after delay time is over.

No comments:

Post a Comment