RocketMQ 如何保证消息的可靠性?

RocketMQ主要通过以下几个方面来保证消息的可靠性:

  1. 消息持久化:RocketMQ将所有接收到的消息都持久化存储在CommitLog中,保证消息不会丢失。
  2. 主从Replication:在Broker层面,Master与Slave实现双写机制,Master写入的消息会同步复制到Slave,保证高可用。
  3. 事务消息:通过事务消息机制实现生产者与Broker的二段式提交,保证消息能够被精确消费一次。
  4. 消息重试:在Push Consumer模式下,如果Consumer由于某些原因没有正常消费消息,Broker会定期重试推送该消息,保证其被消费。
  5. 消息回溯:当消息被多次推送至Consumer但仍未确认时,RocketMQ支持将消息回溯至指定的队列头部,避免消息被反复消费。

下面我们从代码层面分析这几个方面的实现:

  1. 消息持久化:在org.apache.rocketmq.store.CommitLog类中,消息会被写入MappedFileQueue的CommitLog文件中。
public PutMessageResult putMessage(MessageExtBrokerInner msg) { 
    // 把消息写入CommitLog
    final CommitLogDispatcher dispatcher = putMessageLockless(msg);
    // 同步双写到Slave
    if (brokerConfig.isSyncFlush()) {
        dispatcher.waitForSlaveCatchUp();
    }
}
  1. 主从Replication:org.apache.rocketmq.store.ha.HAService类实现Master将消息同步复制到Slave。
public void handleSlaveSynchronize(CommitLogDispatcher dispatcher) {
    if (this.haService instanceof HAService) {
        HAService haService = (HAService) this.haService;
        haService.slaveSynchronize(dispatcher.msg, dispatcher.result);
    }
}
  1. 事务消息:org.apache.rocketmq.common.message.MessageConst类中定义了事务消息相关的flag,org.apache.rocketmq.client.impl.MQClientAPIImpl类实现了事务消息的二段提交与回查。
  2. 消息重试:在org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl类中,通过一定的重试策略实现定期消息重推送。
private void retrySendMessage(final String brokerAddr, final MessageExt msg, final int times) {
    ......
}
  1. 消息回溯:在org.apache.rocketmq.client.impl.consumer.MessageQueueLock类中,通过记录消费进度来实现消息回溯。
private void makeMessageQueueBack() {
    long offset = this.offset - BACK_OFFSET;
    ......
}