应用 晚上 9:28
RocketMQ主要通过以下两种方式来保证消息的顺序性:
- 消息同步刷盘:在生产者发送消息时,设置brokerConfig.setSyncFlush(true)以启用同步刷盘。这会强制刷新每个消息的CommitLog到磁盘,保证消息顺序写磁盘。
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
// 把消息写入CommitLog
final CommitLogDispatcher dispatcher = putMessageLockless(msg);
// 同步刷盘
if (brokerConfig.isSyncFlush()) {
dispatcher.waitForSlaveCatchUp();
}
}
- 消息有序消费:在Push Consumer创建方式中,通过设置consumeMessageService.setOrder(true)以及consumeMessageService.putMessageQueue(mq)指定消费队列来实现消息有序消费。
public void createTopic(String topic, String groupName, MessageListener listener) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
// 设置有序消费
consumer.setMessageListener(listener);
consumer.setOrderly(true);
consumer.subscribe(topic, SubscriptionMode.BY_CUSTOMER_ORDERLY);
// 指定消费队列
consumer.putMessageQueue(topic, 0);
consumer.start();
}
RocketMQ主要依靠同步刷盘来保证消息在存储层面的顺序性,并在消费层面通过有序消费机制来按顺序分发消息,实现最终数据的有序处理。