ActiveMQ 中如何保证消息的可靠性传输?

ActiveMQ 中主要通过以下几种机制保证消息的可靠性传输:

  1. 持久化:将消息持久化到文件系统或数据库中,防止消息丢失。可以配置为消息持久化,会话持久化或订阅持久化。
  2. 事务:使用本地事务或 JTA 事务,可以对消息传输过程中的 ack、commit、rollback 进行控制,保证消息在发生失败的情况下不会丢失。
  3. 确认机制:生产者或消费者配置确认模式,可以指定消息何时 ack,以确保消息已成功发送或消费。
  4. 重发机制:当消息发送失败时,重发该消息,直至成功发送或超过最大重发次数。
  5. 高可用:使用 Master Slave 结构实现 Broker 的高可用,确保单点故障不会导致消息丢失。
  6. 网络重连:网络出现故障时,客户端与 Broker 可以自动重连,消息传输恢复。

例如:
开启持久化:

<broker>
    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>
</broker> 

使用事务:

// 开启事务 
conn.startTransaction();

// 发送消息  
MessageProducer producer = session.createProducer(destination);
producer.send(message);

// 提交或回滚事务
conn.commitTransaction() 
conn.rollbackTransaction()

确认机制:

// 消息确认 - 自动确认 
session.createConsumer(destination, true);  

// 消息确认 - 手动确认
MessageConsumer consumer = session.createConsumer(destination, false); 
consumer.receive();
consumer.acknowledge();

重发机制:

// 默认重发次数为6次
((ActiveMQConnectionFactory)connectionFactory).setRetryInterval(1000);

高可用配置:

<broker> 
    <haPolicy>
        <sharedStore/>
    </haPolicy>    
    <masterConnector>
        <uri>tcp://master-broker:61616</uri> 
    </masterConnector>
    <slaveConnector>
        <uri>tcp://slave-broker:61616</uri>        
    </slaveConnector>
</broker>

网络重连:

<transportConnector uri="tcp://localhost:61616" reconnectDelay="5000" reconnectDelay="100"/>

通过上述机制的配置与组合,可以很好的保证 ActiveMQ 中消息的可靠性。根据实际应用的重要性选择适当的策略即可。