RocketMQ 的消息重试机制是怎样的?

RocketMQ的消息重试机制主要在Push Consumer模式下使用,目的是保证消息能被消费者正常消费。

消息重试主要通过Broker端的定期消息重推和Consumer端的消息消费重试两部分实现。

Broker端消息重推实现主要在org.apache.rocketmq.broker.processor.ConsumeMessageProcessor类中:

public void run() {
    while (!this.isStopped() && this.brokerController.getBrokerConfig().isBrokerConsumeEnable()) {
        try {
            // 遍历所有订阅关系并重试
            for (SubscriptionData subscriptionData : this.brokerController.getConsumerManager().getAllSubscriptionList()) {   
                if (System.currentTimeMillis() >= subscriptionData.getLastConsumeTimestamp() + subscriptionData.getConsumeTimeoutMillis()) {
                    // 如果Consumer超过一定时间未消费,则重推消息
                    Set<MessageExt> messageExtSet = this.consumeMessageService.queryMessage(
                        subscriptionData.getTopic(), subscriptionData.getSubString(), 48, 0
                    );
                    for (MessageExt msg : messageExtSet) {
                        try {
                            this.brokerController.getBroker2Client().sendMessage(
                                msg.getTopic(), msg.getTags(), subscriptionData.getConsumerGroup(), msg
                            );
                        } catch (Exception e) {
                            //Ignore
                        }
                    }
                }
            } 
        } catch (Exception e) {
            log.error("ConsumeMessageProcessor, run method exception", e);
        }
    }
}

Consumer端消息消费重试实现主要在org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl类中:

public void registerMessageListener(MessageListenerConcurrently messageListener) {
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            while (!serviceState.isStopped()) {
                try {
                    // 拉取订阅消息
                    PullResult pullResult = pullMessage(messageListener);
                    this.processQueue.put(pullResult);
                    // 消费消息,如果消费失败则重试
                    DefaultMQPushConsumerImpl.this.consumeMessageService.consumeMessage(pullResult, ConsumeConcurrentlyStatus.RECONSUME_LATER);
                } catch (InterruptedException e) {
                    log.error("InterruptedException occurred during consume message! " + e.getMessage());
                } catch (Exception e) {
                    // 如果是ComsumeConcurrentlyStatus.CONSUME_SUCCESS类型则继续等待下条消息
                    if (e instanceof MQClientException) {
                        DefaultMQPushConsumerImpl.this.consumeMessageService.setLastConsumeTimestamp();
                        DefaultMQPushConsumerImpl.this.consumeMessageService.setConsumeOffset(pullResult.getNextBeginOffset());
                    }
                }
            }
        }
    });
}

RocketMQ主要通过Broker定期对超过一定时间未消费的消息进行重推,以及Consumer消费失败的消息进行重试消费,来实现消息的可靠推送与消费。