RocketMQ提供消息轨迹(Message Trace)功能来记录和追踪消息的流转过程,方便监控消息和排查问题。
消息轨迹主要记录以下内容:
- 消息投递状态:如发送成功、发送失败、消息堆积、消息消费成功等。
- 消息的重试次数和间隔时间。
- 消息从生产者发送到消费者消费的整体耗时。
- 消息在各个Broker的积压时间和传输时间。
- 消息的路由路径和队列堆积信息。
- 消息消费失败的原因:如无消费者、消费者处理失败等。
消息轨迹通过RocketMQ提供的管理界面可以查看,也可以通过开发接口来访问。主要涉及的对象有:
- TraceDispatcher:消息轨迹记录的发起者,生产者和Broker会调用其Record方法记录消息轨迹。
- TraceAppender:消息轨迹的记录器,将轨迹信息encode后写入文件或数据库。
- TraceManager:消息轨迹的管理器,负责加载Appenders和调度Appenders记录消息轨迹。
- QueryMessageTraceRequestHeader:查询消息轨迹的请求,通过MQClientInstance发送至Broker。
- GetMessageTraceResultResponseHeader:Broker返回的消息轨迹查询结果。
示例代码:
// 生产者记录消息轨迹
producer.getTraceDispatcher().recordSendMessage(msg, UUID.randomUUID().toString());
// 查询消息轨迹
QueryMessageTraceRequestHeader requestHeader = new QueryMessageTraceRequestHeader();
requestHeader.setMsgId(msgId);
requestHeader.setTraceId(traceId);
GetMessageTraceResultResponseHeader response = (GetMessageTraceResultResponseHeader) producer.getMQClientInstance().getRemotingClient()
.invokeSync(null, requestHeader, 1000 * 3);
MessageTrace trace = new MessageTrace();
trace.decode(response.getTraceData(), null);
消息轨迹功能可以帮助我们分析消息延迟的原因,查询消息路由信息,排查消息消费失败等问题。理解其实现机制与接口,有助于我们选择最佳的记录器配置和更好地运维RocketMQ集群。