ActiveMQ 中主要通过以下几种机制保证消息的可靠性传输:
- 持久化:将消息持久化到文件系统或数据库中,防止消息丢失。可以配置为消息持久化,会话持久化或订阅持久化。
- 事务:使用本地事务或 JTA 事务,可以对消息传输过程中的 ack、commit、rollback 进行控制,保证消息在发生失败的情况下不会丢失。
- 确认机制:生产者或消费者配置确认模式,可以指定消息何时 ack,以确保消息已成功发送或消费。
- 重发机制:当消息发送失败时,重发该消息,直至成功发送或超过最大重发次数。
- 高可用:使用 Master Slave 结构实现 Broker 的高可用,确保单点故障不会导致消息丢失。
- 网络重连:网络出现故障时,客户端与 Broker 可以自动重连,消息传输恢复。
例如:
开启持久化:
<broker>
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
</broker>
使用事务:
// 开启事务
conn.startTransaction();
// 发送消息
MessageProducer producer = session.createProducer(destination);
producer.send(message);
// 提交或回滚事务
conn.commitTransaction()
conn.rollbackTransaction()
确认机制:
// 消息确认 - 自动确认
session.createConsumer(destination, true);
// 消息确认 - 手动确认
MessageConsumer consumer = session.createConsumer(destination, false);
consumer.receive();
consumer.acknowledge();
重发机制:
// 默认重发次数为6次
((ActiveMQConnectionFactory)connectionFactory).setRetryInterval(1000);
高可用配置:
<broker>
<haPolicy>
<sharedStore/>
</haPolicy>
<masterConnector>
<uri>tcp://master-broker:61616</uri>
</masterConnector>
<slaveConnector>
<uri>tcp://slave-broker:61616</uri>
</slaveConnector>
</broker>
网络重连:
<transportConnector uri="tcp://localhost:61616" reconnectDelay="5000" reconnectDelay="100"/>
通过上述机制的配置与组合,可以很好的保证 ActiveMQ 中消息的可靠性。根据实际应用的重要性选择适当的策略即可。