ActiveMQ如何保障消息业务的原子性?

ActiveMQ主要通过消息本地事务和XA分布式事务,实现消息交换的原子性:

一、本地事务

ActiveMQ支持消息和Session级别的本地事务:

Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

// 创建生产者
MessageProducer producer = session.createProducer(destination);

// 发送消息
producer.send(message1);
producer.send(message2);

// 提交事务
session.commit();

// 回滚事务    
session.rollback();

将消息发送放在事务中,要么全部成功,要么全部回滚。
保障基于单个Broker的消息交换的原子性。

二、XA分布式事务

利用JTA规范中的XA事务,在多个Broker之间实现分布式事务:

Connection conn1 = factory1.createConnection();
Connection conn2 = factory2.createConnection();

XASession session1 = conn1.createXASession();  
XASession session2 = conn2.createXASession();

// 发布消息
session1.createProducer(...).send(msg1);
session2.createProducer(...).send(msg2);

// 提交分布式事务
session1.commit();
session2.commit();

两个Session具有相同的分布式事务ID,消息要么同时成功,要么同时回滚。
保障跨Broker的消息交换的原子性。

总结,ActiveMQ通过:

  1. 消息本地事务
  2. XA分布式事务

来实现消息交换的原子性。本地事务保障单Broker的原子性,XA分布式事务保障跨Broker的原子性。主要机制均通过”All or Nothing”的事务来实现。