在ActiveMQ中,实现异步消费消息主要有两种方式:MessageListener方式和使用线程池。
MessageListener方式
使用MessageListener注册消息监听器,当有消息到达时,自动触发监听器进行消费处理。这种方式是ActiveMQ中最常用的异步消费消息方式。
通过实现MessageListener接口,然后注册到消息消费者(MessageConsumer)中,当有消息到达队列或主题时,ActiveMQ会异步地调用MessageListener的onMessage方法来消费消息。
使用线程池
开启一个线程池,将消费者放入独立线程中执行,从而实现异步消费。这种方式相比MessageListener方式,更加灵活,可以控制并发数量。
实现步骤如下
创建ActiveMQ连接工厂ConnectionFactory
创建ActiveMQ连接Connection
创建会话Session
创建消息目的地Destination
创建消息消费者Consumer
实现Consumer的onMessage方法,用于处理消息。
使用MessageListener方式代码
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// 异步处理消息
}
});
使用线程池代码
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test");
MessageConsumer consumer = session.createConsumer(destination);
ExecutorService executor = Executors.newFixedThreadPool(5);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
executor.execute(new Runnable() {
public void run() {
// 异步处理消息
}
});
}
});
MessageListener除了用匿名类,还可以直接实现一个类,代码如下:
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
// 处理消息逻辑
}
}
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(destination);
// 注册消息监听器
consumer.setMessageListener(new MyMessageListener());