ActiveMQ主要通过消息本地事务和XA分布式事务,实现消息交换的原子性:
一、本地事务
ActiveMQ支持消息和Session级别的本地事务:
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
// 创建生产者
MessageProducer producer = session.createProducer(destination);
// 发送消息
producer.send(message1);
producer.send(message2);
// 提交事务
session.commit();
// 回滚事务
session.rollback();
将消息发送放在事务中,要么全部成功,要么全部回滚。
保障基于单个Broker的消息交换的原子性。
二、XA分布式事务
利用JTA规范中的XA事务,在多个Broker之间实现分布式事务:
Connection conn1 = factory1.createConnection();
Connection conn2 = factory2.createConnection();
XASession session1 = conn1.createXASession();
XASession session2 = conn2.createXASession();
// 发布消息
session1.createProducer(...).send(msg1);
session2.createProducer(...).send(msg2);
// 提交分布式事务
session1.commit();
session2.commit();
两个Session具有相同的分布式事务ID,消息要么同时成功,要么同时回滚。
保障跨Broker的消息交换的原子性。
总结,ActiveMQ通过:
- 消息本地事务
- XA分布式事务
来实现消息交换的原子性。本地事务保障单Broker的原子性,XA分布式事务保障跨Broker的原子性。主要机制均通过”All or Nothing”的事务来实现。