ActiveMQ中如何设置消息的最大并发数?

ActiveMQ 中可以通过以下方式设置消息的最大并发数:

  1. Broker 端设置全局最大并发数:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost"  
      dataDirectory="${activemq.data}">  

  <destinationPolicy>
    <policyEntry queue=">"> 
      <pendingMessageLimitStrategy>
        <constantPendingMessageLimitStrategy limit="1000"/>  
      </pendingMessageLimitStrategy>
    </policyEntry>
  </destinationPolicy>
</broker>  

此设置将限制 Broker 中所有队列的 pending 消息数量最大为 1000。也就是消息生产者最多只能有 1000 条消息等待消费。

  1. Broker 端设置指定队列最大并发数:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost"  
      dataDirectory="${activemq.data}">  

  <destinationPolicy>
    <policyEntry queue="queue1"> 
      <pendingMessageLimitStrategy>
        <constantPendingMessageLimitStrategy limit="500"/>  
      </pendingMessageLimitStrategy>
    </policyEntry> 
  </destinationPolicy>
</broker>  

此设置将限制 queue1 这个队列的 pending 消息数量最大为 500。

  1. 生产者端限制发送消息的最大并发数:
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://broker:61616");  
Connection conn = cf.createConnection();
conn.start();  

Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);  
MessageProducer producer = sess.createProducer(sess.createQueue("queue1"));

producer.setSendTimeout(0); // 设置立即超时
int maxPending = 500;      // 最多500条pending消息
producer.setSendBufferSize(maxPending); 
producer.setUseAsyncSend(true); // 异步发送  

for (int i=0; i<1000; i++) {  // 发送1000条消息
  producer.send(sess.createTextMessage(i+""));  
}

设置 producer 的 sendBufferSize 即可限制其最多只能有 500 条 pending 消息。超过此数量的发送请求将超时失败。