RocketMQ的延迟消息如何实现?

RocketMQ的延迟消息是指消息在指定的时间后再被投递给消费者,起到消息定时发送的作用。

RocketMQ通过消息采用SCHEDULE_TOPIC投递方式和Broker延迟级别来实现延迟消息。主要涉及的对象有:

  • Message:消息类,包含属性scheduleTime来设置消息投递时间。
  • MessageAccessor:消息访问器,用于设置消息的SCHEDULE_TOPIC属性。
  • ScheduleMessageService:延迟消息定时服务,用于触发延迟消息的投递。
  • DefaultMQProducerImpl:默认生产者实现类,用于检查消息属性并调用MessageAccessor设置SCHEDULE_TOPIC。

延迟消息主要流程:

  1. 生产者在发送消息时,调用msg.setScheduleTime(long scheduleTime)设置消息投递时间。
  2. 生产者发送消息前,检查消息是否为延迟消息。如果是,调用MessageAccessor.putProperty来设置SCHEDULE_TOPIC属性。
  3. Broker在接收消息后,会根据SCHEDULE_TOPIC属性将消息投递到__SCHEDULE_TOPIC队列而不是普通队列。
  4. ScheduleMessageService会定期从__SCHEDULE_TOPIC队列中拉取到期的延迟消息,并根据消息scheduleTime属性计算出延迟等级delayLevel。
  5. ScheduleMessageService调用broker的sendMessageInDelay方法将延迟消息发送到普通队列,同时更新consumerOffset。
  6. 消费者从普通队列消费到这条延迟消息,完成最终投递。

代码示例:

// 设置延迟消息
Message msg = new Message("TopicTest", "TagA", "KEY", "Hello".getBytes());
msg.setScheduleTime(System.currentTimeMillis() + 1000 * 60);    // 1分钟后投递

// 发送延迟消息
producer.send(msg);  

// ScheduleMessageService定时投递延迟消息
public void schedule() {
    long now = System.currentTimeMillis();
    for (int i = 0; i < 50; i++) {
        Message message = this.messages.poll(1, TimeUnit.MILLISECONDS);
        if (message != null) {
            if (now >= message.getScheduleTime()) {
                int delayLevel =
                    (int) ((now - message.getScheduleTime()) / this.brokerController
                        .getBrokerConfig().getScheduleMessageDelayLevel());
                this.brokerController.getBroker2Client().sendMessageInDelay(message, delayLevel);
            } else {
                this.messages.add(message);
                break;
            }
        }
    }
}