RabbitMQ中如何实现消息的重发?

RabbitMQ 可以通过消息确认与补偿机制实现消息的重发。

工作原理是:

  1. 生产者在发送消息时开启确认机制 channel.confirmSelect()。
  2. 如果生产者在指定时间内未收到 RabbitMQ 的确认信息,即消息可能未正确投递,这时需要重发消息。
  3. 在消费端,如果消费者在处理消息时发生异常,可以将消息 nack 或 reject,RabbitMQ 会将消息重新入队,实现重发。
  4. 消息重发后可以被新的消费者消费,或者被投递到死信交换机进行异常处理。
  5. 通过消息确认与 nack 机制,在消息可能丢失时实现消息重投递的效果。

示例代码:

生产者:

// 开启确认模式  
channel.confirmSelect(); 
channel.basicPublish(exchange, routingKey, null, message.getBytes());

// 如果长时间未收到确认,需要重发消息 
if (!channel.waitForConfirms(5 * 1000)) { 
  channel.basicRecover(); 
  channel.basicPublish(exchange, routingKey, null, message.getBytes()); 
}

消费者:

// 消费消息  
channel.basicConsume(queue, false, "consumer");
QueueingConsumer.Delivery delivery = consumer.nextDelivery();

try {
  // 处理消息  
} catch (Exception e) {
  // 处理消息异常,nack消息 
  channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);  
}
  1. 生产者开启确认模式,如果长时间未收到确认,调用 channel.basicRecover() 重发消息。
  2. 消费者消费消息时发生异常,调用 channel.basicNack() 方法 nack 消息。
  3. 被 nack 的消息会被 RabbitMQ 重新入队,等待新的消费者消费,实现消息重发。

所以总结来说,RabbitMQ 通过消息确认与 nack 机制实现了消息重发的效果。这需要我们在生产者定义时开启确认模式,并在未收到确认时调用 channel.basicRecover() 方法重发消息。在消费者中,出现异常时正确调用 channel.basicNack() 方法 nack 消息。