RocketMQ 如何保证消息顺序性?

应用 晚上 9:28
RocketMQ主要通过以下两种方式来保证消息的顺序性:

  1. 消息同步刷盘:在生产者发送消息时,设置brokerConfig.setSyncFlush(true)以启用同步刷盘。这会强制刷新每个消息的CommitLog到磁盘,保证消息顺序写磁盘。
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    // 把消息写入CommitLog
    final CommitLogDispatcher dispatcher = putMessageLockless(msg);
    // 同步刷盘
    if (brokerConfig.isSyncFlush()) {
        dispatcher.waitForSlaveCatchUp();
    } 
}
  1. 消息有序消费:在Push Consumer创建方式中,通过设置consumeMessageService.setOrder(true)以及consumeMessageService.putMessageQueue(mq)指定消费队列来实现消息有序消费。
public void createTopic(String topic, String groupName, MessageListener listener) {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
    // 设置有序消费
    consumer.setMessageListener(listener);
    consumer.setOrderly(true);
    consumer.subscribe(topic, SubscriptionMode.BY_CUSTOMER_ORDERLY);
    // 指定消费队列
    consumer.putMessageQueue(topic, 0); 
    consumer.start();
}

RocketMQ主要依靠同步刷盘来保证消息在存储层面的顺序性,并在消费层面通过有序消费机制来按顺序分发消息,实现最终数据的有序处理。