RocketMQ中责任链模式:消息在不同处理模块间传递

在RocketMQ中使用了责任链模式来实现消息的多模块处理。

具体来说:

RocketMQ消息处理流程

RocketMQ在接收到消息后,会按照如下流程进行处理:

  1. 消息格式校验
  2. 内存数据量检查
  3. 放入内存队列
  4. 持久化到磁盘
  5. 根据规则路由给消费者
  6. 触发消费者消费
  7. 记录消费情况等

责任链

RocketMQ实现了一个责任链来处理消息:

public abstract class Handler {

  private Handler next;  

  public void link(Handler handler) {
      next = handler; 
   }

  public void proceed(Message msg) {

      // 自己的逻辑
      doHandle(msg);    

      if(next != null) {
          next.proceed(msg);   
       }
  }

  protected abstract void doHandle(Message msg);

}

每个处理模块实现一个Handler。

模块处理

class CheckHandler extends Handler {

   public void doHandle(Message msg) {

      // 检查消息  
   }
}

class PersistHandler extends Handler {

  public void doHandle(Message msg) {

     // 持久化消息  
  }

}

调用链

CheckHandler ch = new CheckHandler();
PersistHandler ph = new PersistHandler();

ch.link(ph);

Message message = ...

ch.proceed(message);

消息将按照责任链的顺序被各个模块依次处理。

作用

RocketMQ通过责任链模式可以:

  • 解耦各模块的处理逻辑
  • 避免模块直接接收消息
  • 方便添加新的处理模块
  • 顺序可以动态配置

总的来说,RocketMQ利用责任链模式实现消息的多模块处理。各模块解耦、有序地按链式结构处理消息。