美文网首页
Destination高级特性

Destination高级特性

作者: guqj | 来源:发表于2017-12-09 23:56 被阅读0次

长恨人生不如水,等闲平地起波澜。 —— 刘禹锡

1. Wildcards(通配符)

  Wildcars用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。
  ActiveMQ支持以下三种通配符:

  • ".":用于作为路径上名字间的分隔符
  • ">":用于递归的匹配任何以这个名字开始的Destination(目的地)
  • "*":用于作为路径上任何名字。

  举例来说,如有以下两个Destination:
  PRICE.COMPUTER.JD.APPLE(苹果电脑在京东上的价格)
  PRICE.COMPUTER.TMALL.APPLE(苹果电脑在天猫上的价格)

  1. PRICE.> :匹配任何产品的价格变动
  2. PRICE.COMPUTER.> :匹配任何电脑产品的价格变动
  3. PRICE.COMPUTER.JD.*:匹配任何在京东上的电脑的价格变动
  4. PRICE.COMPUTER.*.APPLE:匹配苹果电脑京东或天猫上的价格变动
  JAVA代码示例:
  生产者:

// 实例化连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false");
// 通过连接工厂获取连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
// 创建队列
Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE");
// 创建生产者
MessageProducer messageProducer = session.createProducer(destination);
for (int i = 1; i <= 10; i++) {
    TextMessage textMessage = session.createTextMessage(message);
    messageProducer.send("Mac Air价格:"  + i * 1000);
    System.out.println("发送消息 - " + textMessage.getText());
}
session.commit();

  消费者:

// 实例化连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false");
// 通过连接工厂获取连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
// 创建队列
Destination destination = session.createQueue("PRICE.COMPUTER.>");
// 创建消费者
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener(){
  @Override
    public void onMessage(Message message) {
        try {
            System.out.println("收到的消息:" + ((TextMessage) message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

  生产者发送消息日志:

发送消息:Mac Air价格:1000
发送消息:Mac Air价格:2000
发送消息:Mac Air价格:3000
发送消息:Mac Air价格:4000
发送消息:Mac Air价格:5000
发送消息:Mac Air价格:6000
发送消息:Mac Air价格:7000
发送消息:Mac Air价格:8000
发送消息:Mac Air价格:9000
发送消息:Mac Air价格:10000

  消费者接收到的消息日志:

收到的消息:Mac Air价格:1000
收到的消息:Mac Air价格:2000
收到的消息:Mac Air价格:3000
收到的消息:Mac Air价格:4000
收到的消息:Mac Air价格:5000
收到的消息:Mac Air价格:6000
收到的消息:Mac Air价格:7000
收到的消息:Mac Air价格:8000
收到的消息:Mac Air价格:9000
收到的消息:Mac Air价格:10000

  从消费者的接收日志来看,当消费者使用通配符队列时,是能正常接收消息的。

通配符中是为消费者服务的。即:通配符只能配置在消费端。

2. 组合列队(Composite Destinations)

  组合列队即通过一个虚拟的Destination代表多个Destination,这样就可以通过该虚拟的Destination同时向多个Destination发送消息。
  实现虚拟Destination可通过以下方式实现:
  (1) 客户端方式
  生产者:

Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE,PRICE.COMPUTER.TMALL.APPLE");
// 多个Destination使用,分隔

  消费者:

Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE");
// 或者
Destination destination = session.createQueue("PRICE.COMPUTER.TMALL.APPLE");

// 以上两个消费者都能接收到生产者的消息

若使用不同类型的destination,那么需要加上前缀如queue:// 或topic://。如:Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE,topic://PRICE.COMPUTER.TMALL.APPLE");

  (2) 配置方式
  在activemq.xml的broker中添加以下配置:

<destinationInterceptors>
    <virtualDestinationInterceptor>
        <virtualDestinations>
            <compositeQueue name="PRICE.COMPUTER">
                  <forwardTo>
                    <queue physicalName="PRICE.COMPUTER.JD.APPLE"/>
                    <queue physicalName="PRICE.COMPUTER.TMALL.APPLE" />
                   </forwardTo>
            </compositeQueue>
        </virtualDestinations>
     </virtualDestinationInterceptor>
</destinationInterceptors>

  然后在生产者中,直接向队列PRICE.COMPUTER中发送消息。那么监听PRICE.COMPUTER.JD.APPLE或PRICE.COMPUTER.TMALL.APPLE的消费者都能同时接收到生产者发送的消息。

注意:修改完配置文件后,须重启Broker服务,不然配置是不会生效的!

3. 配置启动队列

  若需要在ActiveMQ启动的时候,创建Destination的话,可以如下配置conf/activemq.xml的broker下:

<destinations>
    <queue physicalName="PRICE.COMPUTER.JD.APPLE" />
    <topic physicalName="PRICE.COMPUTER.TMALL.APPLE" />
</destinations>

4. 队列选项

  队列选项是给consumer在JMS规范之外添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。包括:
   1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable。
   2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认0。
  3:consumer.noLocal :默认false。
   4:consumer.dispatchAsync :是否异步分发 ,默认true。
  5:consumer.retroactive:是否为回溯消费者 ,默认false。
   6:consumer.selector:Jms的Selector,默认null。
   7:consumer.exclusive:是否为独占消费者 ,默认false。
   8:consumer.priority:设置消费者的优先级,默认0。

  示例如下:

queue = new ActiveMQQueue("PRICE.COMPUTER.TMALL.APPLE?consumer.dispatchAsync=true&consumer.prefetchSize=20");
consumer = session.createConsumer(queue);

5. 虚拟Destination

  ActiveMQ支持虚拟Destination有以下两种方式:
  (1) 虚拟主题
  (2) 组合Destinations(前面已介绍过)

5.1 为何使用虚拟主题

  ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
  (1) 同一应用内consumer端负载均衡的问题:即同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。
  (2) 由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。

  在ActiveMQ,可以通过虚拟主题来解决以上两个问题。

5.2 虚拟主题的使用

  对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.Mobile。示例:

Topic destination = session.createTopic("VirtualTopic.Mobille");

  对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。如Consumer.A.VirtualTopic.Mobille,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.Mobille说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此Topic,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。代码示例如下:
  应用A的消费者:

Destination destination = session.createQueue("Consumer.A.VirtualTopic.Mobile");

应用A的消费者启动两个及以上,下面称为A1、A2.

  应用B的消费者:

Destination destination = session.createQueue("Consumer.B.VirtualTopic.Mobile");

  应用A的消费者和应用B的消费者依次启动订阅,发布者发送消息。

  应用A消费者A1消费消息日志:

收到的消息:ActiveMQ 发送的消息:10
收到的消息:ActiveMQ 发送的消息:12
收到的消息:ActiveMQ 发送的消息:14
收到的消息:ActiveMQ 发送的消息:16
收到的消息:ActiveMQ 发送的消息:18

  应用A消费者A2消费消息日志:

收到的消息:ActiveMQ 发送的消息:11
收到的消息:ActiveMQ 发送的消息:13
收到的消息:ActiveMQ 发送的消息:15
收到的消息:ActiveMQ 发送的消息:17
收到的消息:ActiveMQ 发送的消息:19

  应用B消费者消息日志:

收到的消息:ActiveMQ 发送的消息:10
收到的消息:ActiveMQ 发送的消息:11
收到的消息:ActiveMQ 发送的消息:12
收到的消息:ActiveMQ 发送的消息:13
收到的消息:ActiveMQ 发送的消息:14
收到的消息:ActiveMQ 发送的消息:15
收到的消息:ActiveMQ 发送的消息:16
收到的消息:ActiveMQ 发送的消息:17
收到的消息:ActiveMQ 发送的消息:18
收到的消息:ActiveMQ 发送的消息:19

  从以上消费者日志可以看出,应用A的A1、A2两个消费者共同消费了10条消息,而应用B的消费者也同样消费了10条消息,但应用B只要一个消费者。

注意:若是activemq.xml中配置了destinationInterceptors,则需要将destinationInterceptors删除,然后重启Broker服务。

  虚拟主题的前缀默认是VirtualTopic,若是需要修改,可通过以下方式修改:

<destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
            </virtualDestinations>
        </virtualDestinationInterceptor>
</destinationInterceptors>

相关文章

  • Destination高级特性

    长恨人生不如水,等闲平地起波澜。 —— 刘禹锡 1. Wildcards(通配符)   Wildcars用来支持...

  • 使用Netty,我们到底在开发些什么?

    您可能感兴趣的文章: 大数据成神之路系列:Java高级特性增强-集合Java高级特性增强-多线程Java高级特性增...

  • 高级特性

    1.切片:取一个list或tuple的部分元素是非常常见的操作,比如说取list中的前n个元素,我们用循环来实现这...

  • 高级特性

    1.切片(Slice)# python提供了切片(Slice)操作符,能大大简化取数据操作L = ['aa','b...

  • 高级特性

    一、切片(Slice)对list、tuple、str进行截取的方法 L=[1,1,42,65,7,3,8] 1.L...

  • 高级特性

    1. 切片 Python提供了切片(Slice)操作符: ============================...

  • 高级特性

    切片 迭代 列表生成式 生成器 在Python中,一边循环一边计算的机制,称为生成器。 迭代器 凡是可作用于for...

  • 高级特性

    切片(从列表中取出元素) 循环: 相比之下使用Python的Slice操作符可以大大简化。类似于MATLAB的向量...

  • 高级特性

  • 高级特性

    切片 取指定索引范围的操作。 迭代 只要是可迭代对象,无论有无下标,都可以迭代 列表生成式 可以用来创建list的...

网友评论

      本文标题:Destination高级特性

      本文链接:https://www.haomeiwen.com/subject/fwipixtx.html