RabbitMQ 支持消息的批量发送与接收。
主要通过以下方式实现:
- 生产者使用 channel.waitForConfirms() 方法实现消息批量发送。工作原理是:
- 生产者调用channel.confirmSelect()方法启用 Confirm 机制。
- 生产者进入循环,通过channel.waitForConfirms()方法等待确认信息。
- 在循环内部,生产者发布大量消息批量发送到RabbitMQ。
- RabbitMQ会异步发送确认信息到生产者。
- 如果确认信息确认了所有消息发送成功,生产者退出循环;如果有消息发送失败,生产者可以重新发送。
- 这样实现了消息批量发送而避免消息丢失的效果。
示例代码:
// 启用 Confirm 模式
channel.confirmSelect();
while (true) {
// 批量发送消息
for (int i = 0; i < batchSize; i++) {
channel.basicPublish(exchange, routingKey, null, message.getBytes());
}
// 等待 Confirm
if (channel.waitForConfirms()) {
break;
}
}
- 消费者使用 channel.basicQos() 方法限制 Prefetch Count,实现消息批量接收。工作原理是:
- 消费者在启动时调用channel.basicQos()方法,将prefetchCount设置为batch size,如100。
- 这样消费者每次只会从RabbitMQ接收100条消息。
- 消费者处理完这100条消息后,再从RabbitMQ接收下一批100条消息。
- 这样实现了消费者每次接收一定量消息的效果,避免消费者一次性接收太多消息。
示例代码:
// Prefetch Count = 100,每次接收100条消息
channel.basicQos(100);
// 定义消费逻辑
channel.basicConsume(queue, true, "consumer");
所以总结来说,RabbitMQ通过Confirm机制与basicQos方法可以实现消息的批量发送与接收。这需要我们在生产者与消费者定义时启用这两个功能,同时选择合适的配置参数。此外,我们也需要在批量消息场景下测试这两个功能,选择最优的参数,达到较好的batch messaging效果。