RabbitMQ 可以实现高并发下的消息发送与接收。
主要通过以下方式:
- 生产者启用 Publisher Confirms 插件,在生产者高并发发送消息时,避免消息丢失。工作原理是:
- 生产者发送消息后,会异步接收 RabbitMQ 的确认信息。
- 如果 RabbitMQ 成功接收消息,会发送confirm给生产者。
- 如果消息在传输中丢失,RabbitMQ 不会发送确认信息,生产者可以重新发布消息。
- 这样避免了在高并发下,生产者快速发送大量消息导致的消息丢失问题。
示例代码:
// 开启confirm模式
channel.confirmSelect();
// 定期检查confirm
while (true) {
if (channel.waitForConfirms()) {
// 获取confirmed message seq no
}
}
// 发布高并发消息
for (int i = 0; i < 1000; i++) {
channel.basicPublish(exchagne, routingKey, null, message.getBytes());
}
- 消费者启用 basicQos 方法,限制接受消息的数量,避免消费者在高并发下被RabbitMQ淹没。工作原理是:
- 消费者启动时调用 basicQos 方法,将 prefetchCount 设置为适当值,如 100。
- 这样消费者每次只会接收 100 条消息,避免一次性收到太多消息。
- 消费者处理完一批消息后,再接收新的消息。
- 这样避免了消费者在高并发下难以处理过多消息的问题。
示例代码:
// prefetchCount=100,一次只接收100条消息
channel.basicQos(100);
// 定义消费逻辑,处理消息
channel.basicConsume(queue, true, "consumer");
所以总结来说,RabbitMQ 通过 Publisher Confirms 和 basicQos 方法可以实现高并发下的消息发送与接收。这需要我们在生产者与消费者定义时启用这两个功能,同时选择合适的配置参数。此外,我们也需要在高并发情况下实测这两个功能的效果,并根据实际情况选择参数,达到较好的并发 messaging 效果。