美文网首页
Destination高级特性

Destination高级特性

作者: guqj | 来源:发表于2017-12-09 23:56 被阅读0次

    长恨人生不如水,等闲平地起波澜。 —— 刘禹锡

    1. Wildcards(通配符)

      Wildcars用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。
      ActiveMQ支持以下三种通配符:

    • ".":用于作为路径上名字间的分隔符
    • ">":用于递归的匹配任何以这个名字开始的Destination(目的地)
    • "*":用于作为路径上任何名字。

      举例来说,如有以下两个Destination:
      PRICE.COMPUTER.JD.APPLE(苹果电脑在京东上的价格)
      PRICE.COMPUTER.TMALL.APPLE(苹果电脑在天猫上的价格)

      1. PRICE.> :匹配任何产品的价格变动
      2. PRICE.COMPUTER.> :匹配任何电脑产品的价格变动
      3. PRICE.COMPUTER.JD.*:匹配任何在京东上的电脑的价格变动
      4. PRICE.COMPUTER.*.APPLE:匹配苹果电脑京东或天猫上的价格变动
      JAVA代码示例:
      生产者:

    // 实例化连接工厂
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false");
    // 通过连接工厂获取连接
    Connection connection = connectionFactory.createConnection();
    // 启动连接
    connection.start();
    // 创建session
    Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
    // 创建队列
    Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE");
    // 创建生产者
    MessageProducer messageProducer = session.createProducer(destination);
    for (int i = 1; i <= 10; i++) {
        TextMessage textMessage = session.createTextMessage(message);
        messageProducer.send("Mac Air价格:"  + i * 1000);
        System.out.println("发送消息 - " + textMessage.getText());
    }
    session.commit();
    

      消费者:

    // 实例化连接工厂
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false");
    // 通过连接工厂获取连接
    Connection connection = connectionFactory.createConnection();
    // 启动连接
    connection.start();
    // 创建session
    Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
    // 创建队列
    Destination destination = session.createQueue("PRICE.COMPUTER.>");
    // 创建消费者
    MessageConsumer messageConsumer = session.createConsumer(destination);
    messageConsumer.setMessageListener(new MessageListener(){
      @Override
        public void onMessage(Message message) {
            try {
                System.out.println("收到的消息:" + ((TextMessage) message).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    

      生产者发送消息日志:

    发送消息:Mac Air价格:1000
    发送消息:Mac Air价格:2000
    发送消息:Mac Air价格:3000
    发送消息:Mac Air价格:4000
    发送消息:Mac Air价格:5000
    发送消息:Mac Air价格:6000
    发送消息:Mac Air价格:7000
    发送消息:Mac Air价格:8000
    发送消息:Mac Air价格:9000
    发送消息:Mac Air价格:10000
    

      消费者接收到的消息日志:

    收到的消息:Mac Air价格:1000
    收到的消息:Mac Air价格:2000
    收到的消息:Mac Air价格:3000
    收到的消息:Mac Air价格:4000
    收到的消息:Mac Air价格:5000
    收到的消息:Mac Air价格:6000
    收到的消息:Mac Air价格:7000
    收到的消息:Mac Air价格:8000
    收到的消息:Mac Air价格:9000
    收到的消息:Mac Air价格:10000
    

      从消费者的接收日志来看,当消费者使用通配符队列时,是能正常接收消息的。

    通配符中是为消费者服务的。即:通配符只能配置在消费端。

    2. 组合列队(Composite Destinations)

      组合列队即通过一个虚拟的Destination代表多个Destination,这样就可以通过该虚拟的Destination同时向多个Destination发送消息。
      实现虚拟Destination可通过以下方式实现:
      (1) 客户端方式
      生产者:

    Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE,PRICE.COMPUTER.TMALL.APPLE");
    // 多个Destination使用,分隔
    

      消费者:

    Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE");
    // 或者
    Destination destination = session.createQueue("PRICE.COMPUTER.TMALL.APPLE");
    
    // 以上两个消费者都能接收到生产者的消息
    

    若使用不同类型的destination,那么需要加上前缀如queue:// 或topic://。如:Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE,topic://PRICE.COMPUTER.TMALL.APPLE");

      (2) 配置方式
      在activemq.xml的broker中添加以下配置:

    <destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <compositeQueue name="PRICE.COMPUTER">
                      <forwardTo>
                        <queue physicalName="PRICE.COMPUTER.JD.APPLE"/>
                        <queue physicalName="PRICE.COMPUTER.TMALL.APPLE" />
                       </forwardTo>
                </compositeQueue>
            </virtualDestinations>
         </virtualDestinationInterceptor>
    </destinationInterceptors>
    

      然后在生产者中,直接向队列PRICE.COMPUTER中发送消息。那么监听PRICE.COMPUTER.JD.APPLE或PRICE.COMPUTER.TMALL.APPLE的消费者都能同时接收到生产者发送的消息。

    注意:修改完配置文件后,须重启Broker服务,不然配置是不会生效的!

    3. 配置启动队列

      若需要在ActiveMQ启动的时候,创建Destination的话,可以如下配置conf/activemq.xml的broker下:

    <destinations>
        <queue physicalName="PRICE.COMPUTER.JD.APPLE" />
        <topic physicalName="PRICE.COMPUTER.TMALL.APPLE" />
    </destinations>
    

    4. 队列选项

      队列选项是给consumer在JMS规范之外添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。包括:
       1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable。
       2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认0。
      3:consumer.noLocal :默认false。
       4:consumer.dispatchAsync :是否异步分发 ,默认true。
      5:consumer.retroactive:是否为回溯消费者 ,默认false。
       6:consumer.selector:Jms的Selector,默认null。
       7:consumer.exclusive:是否为独占消费者 ,默认false。
       8:consumer.priority:设置消费者的优先级,默认0。

      示例如下:

    queue = new ActiveMQQueue("PRICE.COMPUTER.TMALL.APPLE?consumer.dispatchAsync=true&consumer.prefetchSize=20");
    consumer = session.createConsumer(queue);
    

    5. 虚拟Destination

      ActiveMQ支持虚拟Destination有以下两种方式:
      (1) 虚拟主题
      (2) 组合Destinations(前面已介绍过)

    5.1 为何使用虚拟主题

      ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
      (1) 同一应用内consumer端负载均衡的问题:即同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。
      (2) 由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。

      在ActiveMQ,可以通过虚拟主题来解决以上两个问题。

    5.2 虚拟主题的使用

      对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.Mobile。示例:

    Topic destination = session.createTopic("VirtualTopic.Mobille");
    

      对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。如Consumer.A.VirtualTopic.Mobille,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.Mobille说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此Topic,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。代码示例如下:
      应用A的消费者:

    Destination destination = session.createQueue("Consumer.A.VirtualTopic.Mobile");
    

    应用A的消费者启动两个及以上,下面称为A1、A2.

      应用B的消费者:

    Destination destination = session.createQueue("Consumer.B.VirtualTopic.Mobile");
    

      应用A的消费者和应用B的消费者依次启动订阅,发布者发送消息。

      应用A消费者A1消费消息日志:

    收到的消息:ActiveMQ 发送的消息:10
    收到的消息:ActiveMQ 发送的消息:12
    收到的消息:ActiveMQ 发送的消息:14
    收到的消息:ActiveMQ 发送的消息:16
    收到的消息:ActiveMQ 发送的消息:18
    

      应用A消费者A2消费消息日志:

    收到的消息:ActiveMQ 发送的消息:11
    收到的消息:ActiveMQ 发送的消息:13
    收到的消息:ActiveMQ 发送的消息:15
    收到的消息:ActiveMQ 发送的消息:17
    收到的消息:ActiveMQ 发送的消息:19
    

      应用B消费者消息日志:

    收到的消息:ActiveMQ 发送的消息:10
    收到的消息:ActiveMQ 发送的消息:11
    收到的消息:ActiveMQ 发送的消息:12
    收到的消息:ActiveMQ 发送的消息:13
    收到的消息:ActiveMQ 发送的消息:14
    收到的消息:ActiveMQ 发送的消息:15
    收到的消息:ActiveMQ 发送的消息:16
    收到的消息:ActiveMQ 发送的消息:17
    收到的消息:ActiveMQ 发送的消息:18
    收到的消息:ActiveMQ 发送的消息:19
    

      从以上消费者日志可以看出,应用A的A1、A2两个消费者共同消费了10条消息,而应用B的消费者也同样消费了10条消息,但应用B只要一个消费者。

    注意:若是activemq.xml中配置了destinationInterceptors,则需要将destinationInterceptors删除,然后重启Broker服务。

      虚拟主题的前缀默认是VirtualTopic,若是需要修改,可通过以下方式修改:

    <destinationInterceptors>
            <virtualDestinationInterceptor>
                <virtualDestinations>
                    <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
                </virtualDestinations>
            </virtualDestinationInterceptor>
    </destinationInterceptors>
    

    相关文章

      网友评论

          本文标题:Destination高级特性

          本文链接:https://www.haomeiwen.com/subject/fwipixtx.html