美文网首页
ActiveMQ之Destination

ActiveMQ之Destination

作者: 爱健身的兔子 | 来源:发表于2021-01-04 15:10 被阅读0次

    1 Wildcard(通配符)

    Wildcard用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。

    ActiveMQ支持以下三种wildcard:

    • 用于作为路径上名字间的分隔符:.
    • 用于匹配路径上的任何名字:*
    • 用于递归地匹配任何以这个名字开始的destination:>

    示例,假设有两个Destination:PRICE.STOCK.NASDAQ.IBM 和 PRICE.STOCK.NYSE.SUNW。那么如下配置通配符:

    1. PRICE.>:表示匹配以PRICE.开头的任何Destination。

    2. PRICE.STOCK.>:表示匹配PRICE.STOCK.开头的任何Destination。

    3. PRICE.STOCK.NASDAQ.*:表示匹配 PRICE.STOCK.NASDAQ.下任何名称的Destination,只匹配当前层。PRICE.STOCK.NASDAQ.A能匹配,PRICE.STOCK.NASDAQ.A.B不能匹配。

    路径符号替换,将“/"替换".",添加如下插件:

    <plugins>
        <destinationPathSeparatorPlugin/>
    </plugins>
    

    2 Composite Destinations(组合目的地)

    组合队列Composite Destinations 允许用一个虚拟的destination代表多个destinations,这样就可以通过composite destinations在一个操作中同时向多个queue/topic发送消息。 有两种实现方式:

    1. 在客户端编码实现。

    2. 在activemq.xml配置文件中实现。

    2.1 客户端编码实现

    在创建客户端连接的Destination时,多个destination之间采用","分隔。如下:

    private static final String queueName = "my-queue3,FOO.A,FOO.B";
    

    如果希望不同的Destination的话,需要加前缀“queue://”或者“topic://”。如下

    private static final String queueName = "FOO.A,topic://NOTIFY.FOO.B";
    
    2.2 activemq.xml配置文件中配置
    
    <broker xmlns="http://acti<《》vemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}">
            <destinationInterceptors>
               <virtualDestinationInterceptor>
                  <virtualDestinations>
                         <compositeQueue name="comQueue">
                                <forwardTo>
                                       <queue physicalName="queue.FOO" />
                                       <topic physicalName="topic.FOO" />
                                </forwardTo>
                         </compositeQueue>
                  </virtualDestinations>
               </virtualDestinationInterceptor>
            </destinationInterceptors>
    </broker>
    

    客户端发送示例:

    private static final String queueName = "comQueue";
    

    当客户端向“comQueue”发送消息时,消息就会同时发送到"queue.FOO"和“topic.FOO”中。

    2.3使用filtered destinations

    通过配置selecter来过滤掉不需要的消息,如下:

    <destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> 
       <compositeQueue name="MY.QUEUE">
        <forwardTo>
         <filteredDestination selector="odd = 'yes'" queue="FOO"/>
         <filteredDestination selector="i = 5" topic="BAR"/>
        </forwardTo>
      </compositeQueue>
    </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
    

    3 Configure Startup Destinations(启动创建队列和主题)

    在启动ActiveMQ的时候如果需要创建Destination的话,可以在activemq.xml中配置:

    
     <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}">
           <destinations>
                  <queue physicalName="autoqueue" />
                  <topic physicalName="autotopic" />
           </destinations>
     </broker>
    

    4 Delete Inactive Destinations(删除没有消息的队列或主题)

    在ActiveMQ的queue在不使用之后,可以通过web控制台或者JMX方式来删除掉,当然,也可以通过配置,使得broker可以自动探测到无用的队列并删除掉,回收相应资源。

     <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="1000">
            <destinationPolicy>
                  <policyMap>
                    <policyEntries>
                         <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimeoutBeforeGC="30000" />
                    </policyEntries>
                  </policyMap>
           </destinationPolicy>
     </broker>
    

    参数说明:

    schedulePeriodForDestinationPurge:设置多长时间检查一次,单位是毫秒。
    inactiveTimoutBeforeGC: 设置当Destination为空后,多长时间被删除,单位是毫秒。
    gcInactiveDestinations:设置删除掉不活动的队列,默认为false。

    5 Destination Options

    Destination Options是一种无需扩展JMS API即可向JMS消费者提供扩展配置选项的方法。这些选项使用URL查询语法在创建消费者的目标名称中进行编码。

    queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");
    consumer = session.createConsumer(queue);
    

    消费者选项如下:

    选项名称 默认值 描述
    consumer.dispatchAsync true broker是否异步分发
    consumer.exclusive false 是否为独占消费者
    consumer.maximumPendingMessageLimit 0 用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量。
    consumer.noLocal false 与Topic消费者中的noLocal标志相同。 暴露在这里,以便它可以与队列一起使用。
    consumer.prefetchSize n/a 消费者持有的未确认的最大消费数量
    consumer.priority 0 设置消费者的优先级
    consumer.retroactive false 是否为回溯消费者
    consumer.selector null 配置JMS选择器

    6 Virtual Destination(虚拟目的地)

    虚拟destination用来创建逻辑destination,客户端可以通过它来生产和消费消息,它会把消息映射到物理destination。

    ActiveMQ支持2种方式:

    1. 虚拟主题(Virtual Topics)。

    2. 组合Destinations(Composite Destinations)上文已经介绍过了。

    6.1 为何使用虚拟主题

    ActiveMQ中,topic只有在持久订阅下才会持久化消息。JMS持久化订阅者 MessageConsumer 在创建时会生成一个 唯一的 JMS clientID 和一个持久化订阅者的name。 为了符合JMS,对于一个JMSclientID,同一时间,只能有一个活跃的JMS连接,并且,对于一个clientID和订阅者name而言,只有一个消费者可以是活跃的。也就是说,对于给定的topic订阅者来说,只能由一个活跃的进程进行消费。这意味着我们不能实现:

    • 消息的负载均衡
    • 当一个消费者进程死亡时 订阅者的快速故障转移。

    如今,JMS中的队列语义提供了通过多个消费者来实现可靠的负载均衡——允许多线程,进程和机器来处理消息。然后我们就有了成熟而复杂的负载均衡技术,就像Messge Groups一样在维护订单时进行负载均衡和并行化处理。如下图所示,全量的消息先发送到队列,然后再分发给消费者,通过队列来解决负载均衡和故障转移问题。

    6.2 如何使用虚拟主题

    virtual topic背后的思想是生产者用正常的JMS方式把消息发送到一个topic。消费者可以继续使用JMS标准中的Topic语义。然而,如果 topic 是虚拟的, 订阅一个逻辑topic的消费者可以从一个真实队列中消费,允许多个消费者分布在多个机器上且多线程的进行负载均衡。 例如,假定我们现在有一个叫做 VirtualTopic.Orders 的topic。(前缀 VirtualTopic. 表明这是一个 virtual topic)。

    Topicdestination = session.createTopic("VirtualTopic.Orders");
    

    我们希望将orders 发送到系统A 和 系统B。于是,用通常的 durable topics ,我们需要为 clientID_A 和 "A" 创建一个JMS消费者,同时为 clientID_B 和 "B"创建一个JMS消费者。 有了virtual topics 我们可以直接从队列 Consumer.A.VirtualTopic.Orders 中为系统A 消费,或者从队列 Consumer.B.VirtualTopic.Orders 中为系统B 消费。

    Destination dest = session.createQueue("Consumer.A.VirtualTopic.Orders");
    
    6.3 修改虚拟主题前缀

    默认前缀是:VirtualTopic.>

    自定义消费虚拟地址默认格式:Consumer..VirtualTopic.>*

    下面的示例演示如何使所有主题成为虚拟主题。下面的示例使用名称“>“”表示“匹配所有主题”,“VirtualTopicConsumers”为消费队列的前缀。

    <broker xmlns="http://activemq.apache.org/schema/core">
           <destinationInterceptors>
                  <virtualDestinationInterceptor>
                         <virtualDestinations>
                                <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false" />
                         </virtualDestinations>
                  </virtualDestinationInterceptor>
           </destinationInterceptors>
    </broker>
    

    其他的配置参数:

    选项 默认值 描述
    selectorAwae false 只有与现有订阅者之一匹配的消息才会被实际分发到队列。使用此选项可防止在独占消费者使用选择器时生成不匹配的消息。
    local false 如果为true,则不要扇出通过网络接收的消息
    concurrentSend false 如果为true,则使用执行器扇出以使发送并行进行。这允许日志批量写入,从而减少磁盘io(5.12)
    transactedSend false 如果为true,则将事务用于扇出发送,以便有单个磁盘同步。如果没有客户端事务,则将创建本地代理事务(5.13)
    dropOnResourceLimit false 如果为true,则忽略扇出期间抛出的任何ResourceAllocationException(请参阅:sendFailIfNoSpace策略条目)(5.16)
    setOriginalDestination true 如果为true,则转发消息的目的地设置为消费者队列,并且originalDestination消息属性跟随虚拟主题(5.16)
    6.4 broker网络中避免重复消息

    通常情况下,使用网络连接消费者队列。在这种情况下,不要在虚拟主题上桥接任何普通主题消费者,因为任何转发的消息都会再次分散到网络代理上的消费者队列,从而导致消息重复。下面示例展示了如何排除虚拟队列:

    <networkConnectors> <networkConnector uri="static://([tcp://localhost:61617](tcp://localhost:61617))">
     <excludedDestinations> 
     <queue physicalName="Consumer.*.VirtualTopic.>"/> 
     </excludedDestinations> 
    </networkConnector> </networkConnectors>
    

    7 Mirrored Queues(镜像队列)

    ActiveMQ中每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。 使用ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。为了启用Mirrored Queues,首先要将BrokerService的“useMirroredQueues“属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省是“VirtualTopic.Mirror.”。如下订阅“*.qmirror”的主题就开启了mirrored queue。

    
    <broker xmlns=http://activemq.apache.org/schema/core useMirroredQueue="true">
    
        <destinationInterceptors>
                  <mirroredQueue copyMessage="true" postfix=".qmirror" prefix="" />
        </destinationInterceptors>
    
    </broker>
    

    http://activemq.apache.org/features

    相关文章

      网友评论

          本文标题:ActiveMQ之Destination

          本文链接:https://www.haomeiwen.com/subject/sqdvoktx.html