RocketMQ 如何保证消息的幂等性?

RocketMQ主要通过幂等生产者与消息去重来保证消息的幂等性。

  1. 幂等生产者:生产者设置producer.setRetryTimesWhenSendFailed(Integer.MAX_VALUE);以及producer.setRetryAnotherBrokerWhenNotStoreOK(true),这样在消息发送失败时,生产者会无限重试向其他Broker发送消息,保证消息最终到达Broker。
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    producer.setRetryTimesWhenSendFailed(Integer.MAX_VALUE);
    producer.setRetryAnotherBrokerWhenNotStoreOK(true);
    // 发送消息
    return producer.send(msg);
}
  1. 消息去重:Broker在接收到消息后,会先判断是否是重复消息。重复消息会直接返回成功响应,并丢弃消息,保证不进行重复消费。

RocketMQ主要通过两级去重来判断消息是否重复:

  • 时间戳去重:如果接收消息的时间戳小于等于Broker已有消息的最大时间戳,则判断为重复消息。
  • 唯一ID去重:对每个Topic,Broker会维护一个长整形变量记载已消费消息的最大offset,如果接收消息的offset小于等于此变量,则判断为重复消息。

去重相关代码:

  • org.apache.rocketmq.broker.processor.SendMessageProcessor#isFilterMessage:实现时间戳去重判断。
  • org.apache.rocketmq.broker.processor.SendMessageProcessor#isDuplicateMessage:实现唯一ID去重判断。