ActiveMQ中如何实现异步消费消息?详细代码demo

在ActiveMQ中,实现异步消费消息主要有两种方式:MessageListener方式和使用线程池。

MessageListener方式

使用MessageListener注册消息监听器,当有消息到达时,自动触发监听器进行消费处理。这种方式是ActiveMQ中最常用的异步消费消息方式。
通过实现MessageListener接口,然后注册到消息消费者(MessageConsumer)中,当有消息到达队列或主题时,ActiveMQ会异步地调用MessageListener的onMessage方法来消费消息。

使用线程池

开启一个线程池,将消费者放入独立线程中执行,从而实现异步消费。这种方式相比MessageListener方式,更加灵活,可以控制并发数量。

实现步骤如下

创建ActiveMQ连接工厂ConnectionFactory

创建ActiveMQ连接Connection

创建会话Session

创建消息目的地Destination

创建消息消费者Consumer

实现Consumer的onMessage方法,用于处理消息。

使用MessageListener方式代码

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test");
MessageConsumer consumer = session.createConsumer(destination);

consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {
        // 异步处理消息
    }
});

使用线程池代码

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test");
MessageConsumer consumer = session.createConsumer(destination);
ExecutorService executor = Executors.newFixedThreadPool(5);

consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {
        executor.execute(new Runnable() {
            public void run() {
                // 异步处理消息
            }
        });
    }
});

MessageListener除了用匿名类,还可以直接实现一个类,代码如下:

public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        // 处理消息逻辑
    }
}

// 创建消息消费者
MessageConsumer consumer = session.createConsumer(destination);

// 注册消息监听器
consumer.setMessageListener(new MyMessageListener());