跳到主要内容

10、ActiveMQ 实战 - 组合队列

组合队列支持通过一个队列发送给多个队列或主题

1 客户端实现方式

1.1 实现概述

在composite destinations中,多个destination之间采用“,”分割。例如:
Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如:
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");

1.2 代码实现

1.2.1 容器中注入compositeDestinationContext

@Bean
public ActiveMqContext compositeDestinationContext() throws JMSException {
          Connection connection = ActiveMQUtil.factory.createConnection();
          connection.start();
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          // compositeDestinationName=composite-queue1,topic://composite-topic1
          Destination destination = session.createQueue(compositeDestinationName);
          MessageProducer producer = session.createProducer(destination);
          return new ActiveMqContext(connection, session, null, producer, null, null, Boolean.FALSE);
}

1.2.2 CompositeProcedure

@Component
public class CompositeProcedure {

          @Resource(name = "compositeDestinationContext")
          private ActiveMqContext context;

          public void sendMsg(String msg) throws JMSException {
                    ActiveMQUtil.sendMsg(context, msg);
          }

}

1.2.3 CompositeQueueConsumer

@Slf4j
@Component
public class CompositeQueueConsumer {

  public void receiveMsg() throws JMSException, InterruptedException {
            Connection connection = ActiveMQUtil.factory.createConnection();
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("composite-queue1");
            MessageConsumer consumer = session.createConsumer(queue);
            connection.start();
            MessageListener listener1 = message -> {
                      try {
                                log.info("queue receive message:{}", ((TextMessage)message).getText());
                      } catch (Exception e) {
                                e.printStackTrace();
                      }
            };
            consumer.setMessageListener(listener1);
            Thread.sleep(5000000L);
  }

}

1.2.4 CompositeTopicConsumer

@Slf4j
@Component
public class CompositeTopicConsumer {

  public void receiveMsg() throws JMSException, InterruptedException {
            Connection connection = ActiveMQUtil.factory.createConnection();
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("composite-topic1");
            MessageConsumer consumer = session.createConsumer(topic);
            connection.start();
            MessageListener listener = message -> {
                      try {
                                log.info("topic receive message:{}", ((TextMessage)message).getText());
                      } catch (Exception e) {
                                e.printStackTrace();
                      }
            };
            consumer.setMessageListener(listener);
            Thread.sleep(5000000L);
  }

}

1.2.5 测试代码

@Autowired
private CompositeProcedure procedure;

@Autowired
private CompositeQueueConsumer queueConsumer;

@Autowired
private CompositeTopicConsumer topicConsumer;

@Test
public void sendMsg() throws JMSException, InterruptedException {
          new Thread(() -> {
                    try {
                              receiveMsg1();
                    } catch (JMSException | InterruptedException e) {
                              e.printStackTrace();
                    }
          }).start();
          new Thread(() -> {
                    try {
                              receiveMsg2();
                    } catch (JMSException | InterruptedException e) {
                              e.printStackTrace();
                    }
          }).start();
          TimeUnit.SECONDS.sleep(5);
          for (int i = 0; i < 10; i ++) {
                    procedure.sendMsg("msg-" + i);
                    Thread.sleep(100);
          }
          TimeUnit.MINUTES.sleep(1);
}

private void receiveMsg1() throws JMSException, InterruptedException {
          queueConsumer.receiveMsg();
}

private void receiveMsg2() throws JMSException, InterruptedException {
          topicConsumer.receiveMsg();
}

输出 2022-07-20 22:02:34.055 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeTopicConsumer : 25 - topic receive message:msg-0
2022-07-20 22:02:34.059 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeQueueConsumer : 25 - queue receive message:msg-0
2022-07-20 22:02:34.057 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-51175-1595253745530-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = composite-queue1,topic://composite-topic1, transactionId = null, expiration = 0, timestamp = 1595253754025, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-0}
2022-07-20 22:02:34.134 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeTopicConsumer : 25 - topic receive message:msg-1
2022-07-20 22:02:34.134 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-51175-1595253745530-1:1:1:1:2, originalDestination = null, originalTransactionId = null, producerId = null, destination = composite-queue1,topic://composite-topic1, transactionId = null, expiration = 0, timestamp = 1595253754128, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-1}
2022-07-20 22:02:34.134 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeQueueConsumer : 25 - queue receive message:msg-1
2022-07-20 22:02:34.244 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeTopicConsumer : 25 - topic receive message:msg-2
2022-07-20 22:02:34.256 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeQueueConsumer : 25 - queue receive message:msg-2
2022-07-20 22:02:34.270 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-51175-1595253745530-1:1:1:1:3, originalDestination = null, originalTransactionId = null, producerId = null, destination = composite-queue1,topic://composite-topic1, transactionId = null, expiration = 0, timestamp = 1595253754230, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-2}
2022-07-20 22:02:34.335 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeTopicConsumer : 25 - topic receive message:msg-3
2022-07-20 22:02:34.335 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-51175-1595253745530-1:1:1:1:4, originalDestination = null, originalTransactionId = null, producerId = null, destination = composite-queue1,topic://composite-topic1, transactionId = null, expiration = 0, timestamp = 1595253754332, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-3}
2022-07-20 22:02:34.335 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeQueueConsumer : 25 - queue receive message:msg-3
2022-07-20 22:02:34.435 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeTopicConsumer : 25 - topic receive message:msg-4
2022-07-20 22:02:34.439 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeQueueConsumer : 25 - queue receive message:msg-4
2022-07-20 22:02:34.453 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-51175-1595253745530-1:1:1:1:5, originalDestination = null, originalTransactionId = null, producerId = null, destination = composite-queue1,topic://composite-topic1, transactionId = null, expiration = 0, timestamp = 1595253754433, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-4}
2022-07-20 22:02:34.539 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeTopicConsumer : 25 - topic receive message:msg-5
2022-07-20 22:02:34.539 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-51175-1595253745530-1:1:1:1:6, originalDestination = null, originalTransactionId = null, producerId = null, destination = composite-queue1,topic://composite-topic1, transactionId = null, expiration = 0, timestamp = 1595253754535, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-5}
2022-07-20 22:02:34.539 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeQueueConsumer : 25 - queue receive message:msg-5
2022-07-20 22:02:34.638 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeTopicConsumer : 25 - topic receive message:msg-6
2022-07-20 22:02:34.638 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeQueueConsumer : 25 - queue receive message:msg-6
2022-07-20 22:02:34.652 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-51175-1595253745530-1:1:1:1:7, originalDestination = null, originalTransactionId = null, producerId = null, destination = composite-queue1,topic://composite-topic1, transactionId = null, expiration = 0, timestamp = 1595253754636, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-6}
2022-07-20 22:02:34.738 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeTopicConsumer : 25 - topic receive message:msg-7
2022-07-20 22:02:34.740 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeQueueConsumer : 25 - queue receive message:msg-7
2022-07-20 22:02:34.740 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-51175-1595253745530-1:1:1:1:8, originalDestination = null, originalTransactionId = null, producerId = null, destination = composite-queue1,topic://composite-topic1, transactionId = null, expiration = 0, timestamp = 1595253754736, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-7}
2022-07-20 22:02:34.839 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeTopicConsumer : 25 - topic receive message:msg-8
2022-07-20 22:02:34.841 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeQueueConsumer : 25 - queue receive message:msg-8
2022-07-20 22:02:34.852 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-51175-1595253745530-1:1:1:1:9, originalDestination = null, originalTransactionId = null, producerId = null, destination = composite-queue1,topic://composite-topic1, transactionId = null, expiration = 0, timestamp = 1595253754838, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-8}
2022-07-20 22:02:34.941 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeTopicConsumer : 25 - topic receive message:msg-9
2022-07-20 22:02:34.943 - [ActiveMQ Session Task-1] INFO c.j.a.d.c.CompositeQueueConsumer : 25 - queue receive message:msg-9

查看队列

 

查看主题

 

2 xml配置

<destinationInterceptors>
  <virtualDestinationInterceptor>
    <virtualDestinations>
      <compositeQueue name="MY.QUEUE">
        <forwardTo>
          <queue physicalName="my-queue" />
          <queue physicalName="my-queue2" />
        </forwardTo>
      </compositeQueue>
    </virtualDestinations>
  </virtualDestinationInterceptor>
</destinationInterceptors>