RocketMQ的消息轨迹如何记录和追踪?

RocketMQ提供消息轨迹(Message Trace)功能来记录和追踪消息的流转过程,方便监控消息和排查问题。

消息轨迹主要记录以下内容:

  • 消息投递状态:如发送成功、发送失败、消息堆积、消息消费成功等。
  • 消息的重试次数和间隔时间。
  • 消息从生产者发送到消费者消费的整体耗时。
  • 消息在各个Broker的积压时间和传输时间。
  • 消息的路由路径和队列堆积信息。
  • 消息消费失败的原因:如无消费者、消费者处理失败等。

消息轨迹通过RocketMQ提供的管理界面可以查看,也可以通过开发接口来访问。主要涉及的对象有:

  • TraceDispatcher:消息轨迹记录的发起者,生产者和Broker会调用其Record方法记录消息轨迹。
  • TraceAppender:消息轨迹的记录器,将轨迹信息encode后写入文件或数据库。
  • TraceManager:消息轨迹的管理器,负责加载Appenders和调度Appenders记录消息轨迹。
  • QueryMessageTraceRequestHeader:查询消息轨迹的请求,通过MQClientInstance发送至Broker。
  • GetMessageTraceResultResponseHeader:Broker返回的消息轨迹查询结果。

示例代码:

// 生产者记录消息轨迹
producer.getTraceDispatcher().recordSendMessage(msg, UUID.randomUUID().toString());

// 查询消息轨迹
QueryMessageTraceRequestHeader requestHeader = new QueryMessageTraceRequestHeader();
requestHeader.setMsgId(msgId);
requestHeader.setTraceId(traceId); 
GetMessageTraceResultResponseHeader response = (GetMessageTraceResultResponseHeader) producer.getMQClientInstance().getRemotingClient()
    .invokeSync(null, requestHeader, 1000 * 3);
MessageTrace trace = new MessageTrace();
trace.decode(response.getTraceData(), null);

消息轨迹功能可以帮助我们分析消息延迟的原因,查询消息路由信息,排查消息消费失败等问题。理解其实现机制与接口,有助于我们选择最佳的记录器配置和更好地运维RocketMQ集群。