RocketMQ的延迟消息是指消息在指定的时间后再被投递给消费者,起到消息定时发送的作用。
RocketMQ通过消息采用SCHEDULE_TOPIC投递方式和Broker延迟级别来实现延迟消息。主要涉及的对象有:
- Message:消息类,包含属性scheduleTime来设置消息投递时间。
- MessageAccessor:消息访问器,用于设置消息的SCHEDULE_TOPIC属性。
- ScheduleMessageService:延迟消息定时服务,用于触发延迟消息的投递。
- DefaultMQProducerImpl:默认生产者实现类,用于检查消息属性并调用MessageAccessor设置SCHEDULE_TOPIC。
延迟消息主要流程:
- 生产者在发送消息时,调用msg.setScheduleTime(long scheduleTime)设置消息投递时间。
- 生产者发送消息前,检查消息是否为延迟消息。如果是,调用MessageAccessor.putProperty来设置SCHEDULE_TOPIC属性。
- Broker在接收消息后,会根据SCHEDULE_TOPIC属性将消息投递到__SCHEDULE_TOPIC队列而不是普通队列。
- ScheduleMessageService会定期从__SCHEDULE_TOPIC队列中拉取到期的延迟消息,并根据消息scheduleTime属性计算出延迟等级delayLevel。
- ScheduleMessageService调用broker的sendMessageInDelay方法将延迟消息发送到普通队列,同时更新consumerOffset。
- 消费者从普通队列消费到这条延迟消息,完成最终投递。
代码示例:
// 设置延迟消息
Message msg = new Message("TopicTest", "TagA", "KEY", "Hello".getBytes());
msg.setScheduleTime(System.currentTimeMillis() + 1000 * 60); // 1分钟后投递
// 发送延迟消息
producer.send(msg);
// ScheduleMessageService定时投递延迟消息
public void schedule() {
long now = System.currentTimeMillis();
for (int i = 0; i < 50; i++) {
Message message = this.messages.poll(1, TimeUnit.MILLISECONDS);
if (message != null) {
if (now >= message.getScheduleTime()) {
int delayLevel =
(int) ((now - message.getScheduleTime()) / this.brokerController
.getBrokerConfig().getScheduleMessageDelayLevel());
this.brokerController.getBroker2Client().sendMessageInDelay(message, delayLevel);
} else {
this.messages.add(message);
break;
}
}
}
}