美文网首页
ActiveMQ之Destination

ActiveMQ之Destination

作者: 爱健身的兔子 | 来源:发表于2021-01-04 15:10 被阅读0次

1 Wildcard(通配符)

Wildcard用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。

ActiveMQ支持以下三种wildcard:

  • 用于作为路径上名字间的分隔符:.
  • 用于匹配路径上的任何名字:*
  • 用于递归地匹配任何以这个名字开始的destination:>

示例,假设有两个Destination:PRICE.STOCK.NASDAQ.IBM 和 PRICE.STOCK.NYSE.SUNW。那么如下配置通配符:

  1. PRICE.>:表示匹配以PRICE.开头的任何Destination。

  2. PRICE.STOCK.>:表示匹配PRICE.STOCK.开头的任何Destination。

  3. PRICE.STOCK.NASDAQ.*:表示匹配 PRICE.STOCK.NASDAQ.下任何名称的Destination,只匹配当前层。PRICE.STOCK.NASDAQ.A能匹配,PRICE.STOCK.NASDAQ.A.B不能匹配。

路径符号替换,将“/"替换".",添加如下插件:

<plugins>
    <destinationPathSeparatorPlugin/>
</plugins>

2 Composite Destinations(组合目的地)

组合队列Composite Destinations 允许用一个虚拟的destination代表多个destinations,这样就可以通过composite destinations在一个操作中同时向多个queue/topic发送消息。 有两种实现方式:

  1. 在客户端编码实现。

  2. 在activemq.xml配置文件中实现。

2.1 客户端编码实现

在创建客户端连接的Destination时,多个destination之间采用","分隔。如下:

private static final String queueName = "my-queue3,FOO.A,FOO.B";

如果希望不同的Destination的话,需要加前缀“queue://”或者“topic://”。如下

private static final String queueName = "FOO.A,topic://NOTIFY.FOO.B";
2.2 activemq.xml配置文件中配置

<broker xmlns="http://acti<《》vemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}">
        <destinationInterceptors>
           <virtualDestinationInterceptor>
              <virtualDestinations>
                     <compositeQueue name="comQueue">
                            <forwardTo>
                                   <queue physicalName="queue.FOO" />
                                   <topic physicalName="topic.FOO" />
                            </forwardTo>
                     </compositeQueue>
              </virtualDestinations>
           </virtualDestinationInterceptor>
        </destinationInterceptors>
</broker>

客户端发送示例:

private static final String queueName = "comQueue";

当客户端向“comQueue”发送消息时,消息就会同时发送到"queue.FOO"和“topic.FOO”中。

2.3使用filtered destinations

通过配置selecter来过滤掉不需要的消息,如下:

<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> 
   <compositeQueue name="MY.QUEUE">
    <forwardTo>
     <filteredDestination selector="odd = 'yes'" queue="FOO"/>
     <filteredDestination selector="i = 5" topic="BAR"/>
    </forwardTo>
  </compositeQueue>
</virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>

3 Configure Startup Destinations(启动创建队列和主题)

在启动ActiveMQ的时候如果需要创建Destination的话,可以在activemq.xml中配置:


 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}">
       <destinations>
              <queue physicalName="autoqueue" />
              <topic physicalName="autotopic" />
       </destinations>
 </broker>

4 Delete Inactive Destinations(删除没有消息的队列或主题)

在ActiveMQ的queue在不使用之后,可以通过web控制台或者JMX方式来删除掉,当然,也可以通过配置,使得broker可以自动探测到无用的队列并删除掉,回收相应资源。

 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="1000">
        <destinationPolicy>
              <policyMap>
                <policyEntries>
                     <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimeoutBeforeGC="30000" />
                </policyEntries>
              </policyMap>
       </destinationPolicy>
 </broker>

参数说明:

schedulePeriodForDestinationPurge:设置多长时间检查一次,单位是毫秒。
inactiveTimoutBeforeGC: 设置当Destination为空后,多长时间被删除,单位是毫秒。
gcInactiveDestinations:设置删除掉不活动的队列,默认为false。

5 Destination Options

Destination Options是一种无需扩展JMS API即可向JMS消费者提供扩展配置选项的方法。这些选项使用URL查询语法在创建消费者的目标名称中进行编码。

queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");
consumer = session.createConsumer(queue);

消费者选项如下:

选项名称 默认值 描述
consumer.dispatchAsync true broker是否异步分发
consumer.exclusive false 是否为独占消费者
consumer.maximumPendingMessageLimit 0 用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量。
consumer.noLocal false 与Topic消费者中的noLocal标志相同。 暴露在这里,以便它可以与队列一起使用。
consumer.prefetchSize n/a 消费者持有的未确认的最大消费数量
consumer.priority 0 设置消费者的优先级
consumer.retroactive false 是否为回溯消费者
consumer.selector null 配置JMS选择器

6 Virtual Destination(虚拟目的地)

虚拟destination用来创建逻辑destination,客户端可以通过它来生产和消费消息,它会把消息映射到物理destination。

ActiveMQ支持2种方式:

  1. 虚拟主题(Virtual Topics)。

  2. 组合Destinations(Composite Destinations)上文已经介绍过了。

6.1 为何使用虚拟主题

ActiveMQ中,topic只有在持久订阅下才会持久化消息。JMS持久化订阅者 MessageConsumer 在创建时会生成一个 唯一的 JMS clientID 和一个持久化订阅者的name。 为了符合JMS,对于一个JMSclientID,同一时间,只能有一个活跃的JMS连接,并且,对于一个clientID和订阅者name而言,只有一个消费者可以是活跃的。也就是说,对于给定的topic订阅者来说,只能由一个活跃的进程进行消费。这意味着我们不能实现:

  • 消息的负载均衡
  • 当一个消费者进程死亡时 订阅者的快速故障转移。

如今,JMS中的队列语义提供了通过多个消费者来实现可靠的负载均衡——允许多线程,进程和机器来处理消息。然后我们就有了成熟而复杂的负载均衡技术,就像Messge Groups一样在维护订单时进行负载均衡和并行化处理。如下图所示,全量的消息先发送到队列,然后再分发给消费者,通过队列来解决负载均衡和故障转移问题。

6.2 如何使用虚拟主题

virtual topic背后的思想是生产者用正常的JMS方式把消息发送到一个topic。消费者可以继续使用JMS标准中的Topic语义。然而,如果 topic 是虚拟的, 订阅一个逻辑topic的消费者可以从一个真实队列中消费,允许多个消费者分布在多个机器上且多线程的进行负载均衡。 例如,假定我们现在有一个叫做 VirtualTopic.Orders 的topic。(前缀 VirtualTopic. 表明这是一个 virtual topic)。

Topicdestination = session.createTopic("VirtualTopic.Orders");

我们希望将orders 发送到系统A 和 系统B。于是,用通常的 durable topics ,我们需要为 clientID_A 和 "A" 创建一个JMS消费者,同时为 clientID_B 和 "B"创建一个JMS消费者。 有了virtual topics 我们可以直接从队列 Consumer.A.VirtualTopic.Orders 中为系统A 消费,或者从队列 Consumer.B.VirtualTopic.Orders 中为系统B 消费。

Destination dest = session.createQueue("Consumer.A.VirtualTopic.Orders");
6.3 修改虚拟主题前缀

默认前缀是:VirtualTopic.>

自定义消费虚拟地址默认格式:Consumer..VirtualTopic.>*

下面的示例演示如何使所有主题成为虚拟主题。下面的示例使用名称“>“”表示“匹配所有主题”,“VirtualTopicConsumers”为消费队列的前缀。

<broker xmlns="http://activemq.apache.org/schema/core">
       <destinationInterceptors>
              <virtualDestinationInterceptor>
                     <virtualDestinations>
                            <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false" />
                     </virtualDestinations>
              </virtualDestinationInterceptor>
       </destinationInterceptors>
</broker>

其他的配置参数:

选项 默认值 描述
selectorAwae false 只有与现有订阅者之一匹配的消息才会被实际分发到队列。使用此选项可防止在独占消费者使用选择器时生成不匹配的消息。
local false 如果为true,则不要扇出通过网络接收的消息
concurrentSend false 如果为true,则使用执行器扇出以使发送并行进行。这允许日志批量写入,从而减少磁盘io(5.12)
transactedSend false 如果为true,则将事务用于扇出发送,以便有单个磁盘同步。如果没有客户端事务,则将创建本地代理事务(5.13)
dropOnResourceLimit false 如果为true,则忽略扇出期间抛出的任何ResourceAllocationException(请参阅:sendFailIfNoSpace策略条目)(5.16)
setOriginalDestination true 如果为true,则转发消息的目的地设置为消费者队列,并且originalDestination消息属性跟随虚拟主题(5.16)
6.4 broker网络中避免重复消息

通常情况下,使用网络连接消费者队列。在这种情况下,不要在虚拟主题上桥接任何普通主题消费者,因为任何转发的消息都会再次分散到网络代理上的消费者队列,从而导致消息重复。下面示例展示了如何排除虚拟队列:

<networkConnectors> <networkConnector uri="static://([tcp://localhost:61617](tcp://localhost:61617))">
 <excludedDestinations> 
 <queue physicalName="Consumer.*.VirtualTopic.>"/> 
 </excludedDestinations> 
</networkConnector> </networkConnectors>

7 Mirrored Queues(镜像队列)

ActiveMQ中每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。 使用ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。为了启用Mirrored Queues,首先要将BrokerService的“useMirroredQueues“属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省是“VirtualTopic.Mirror.”。如下订阅“*.qmirror”的主题就开启了mirrored queue。


<broker xmlns=http://activemq.apache.org/schema/core useMirroredQueue="true">

    <destinationInterceptors>
              <mirroredQueue copyMessage="true" postfix=".qmirror" prefix="" />
    </destinationInterceptors>

</broker>

http://activemq.apache.org/features

相关文章

网友评论

      本文标题:ActiveMQ之Destination

      本文链接:https://www.haomeiwen.com/subject/sqdvoktx.html