1 Wildcard(通配符)
Wildcard用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。
ActiveMQ支持以下三种wildcard:
- 用于作为路径上名字间的分隔符:
.
- 用于匹配路径上的任何名字:
*
- 用于递归地匹配任何以这个名字开始的destination:
>
示例,假设有两个Destination:PRICE.STOCK.NASDAQ.IBM 和 PRICE.STOCK.NYSE.SUNW。那么如下配置通配符:
-
PRICE.>:表示匹配以PRICE.开头的任何Destination。
-
PRICE.STOCK.>:表示匹配PRICE.STOCK.开头的任何Destination。
-
PRICE.STOCK.NASDAQ.*:表示匹配 PRICE.STOCK.NASDAQ.下任何名称的Destination,只匹配当前层。PRICE.STOCK.NASDAQ.A能匹配,PRICE.STOCK.NASDAQ.A.B不能匹配。
路径符号替换,将“/"替换".",添加如下插件:
<plugins>
<destinationPathSeparatorPlugin/>
</plugins>
2 Composite Destinations(组合目的地)
组合队列Composite Destinations 允许用一个虚拟的destination代表多个destinations,这样就可以通过composite destinations在一个操作中同时向多个queue/topic发送消息。 有两种实现方式:
-
在客户端编码实现。
-
在activemq.xml配置文件中实现。
2.1 客户端编码实现
在创建客户端连接的Destination时,多个destination之间采用","分隔。如下:
private static final String queueName = "my-queue3,FOO.A,FOO.B";
如果希望不同的Destination的话,需要加前缀“queue://”或者“topic://”。如下
private static final String queueName = "FOO.A,topic://NOTIFY.FOO.B";
2.2 activemq.xml配置文件中配置
<broker xmlns="http://acti<《》vemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}">
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="comQueue">
<forwardTo>
<queue physicalName="queue.FOO" />
<topic physicalName="topic.FOO" />
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
</broker>
客户端发送示例:
private static final String queueName = "comQueue";
当客户端向“comQueue”发送消息时,消息就会同时发送到"queue.FOO"和“topic.FOO”中。
2.3使用filtered destinations
通过配置selecter来过滤掉不需要的消息,如下:
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations>
<compositeQueue name="MY.QUEUE">
<forwardTo>
<filteredDestination selector="odd = 'yes'" queue="FOO"/>
<filteredDestination selector="i = 5" topic="BAR"/>
</forwardTo>
</compositeQueue>
</virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
3 Configure Startup Destinations(启动创建队列和主题)
在启动ActiveMQ的时候如果需要创建Destination的话,可以在activemq.xml中配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}">
<destinations>
<queue physicalName="autoqueue" />
<topic physicalName="autotopic" />
</destinations>
</broker>
4 Delete Inactive Destinations(删除没有消息的队列或主题)
在ActiveMQ的queue在不使用之后,可以通过web控制台或者JMX方式来删除掉,当然,也可以通过配置,使得broker可以自动探测到无用的队列并删除掉,回收相应资源。
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="1000">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" gcInactiveDestinations="true" inactiveTimeoutBeforeGC="30000" />
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
参数说明:
schedulePeriodForDestinationPurge:设置多长时间检查一次,单位是毫秒。
inactiveTimoutBeforeGC: 设置当Destination为空后,多长时间被删除,单位是毫秒。
gcInactiveDestinations:设置删除掉不活动的队列,默认为false。
5 Destination Options
Destination Options是一种无需扩展JMS API即可向JMS消费者提供扩展配置选项的方法。这些选项使用URL查询语法在创建消费者的目标名称中进行编码。
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
消费者选项如下:
选项名称 | 默认值 | 描述 |
---|---|---|
consumer.dispatchAsync |
true |
broker是否异步分发 |
consumer.exclusive |
false |
是否为独占消费者 |
consumer.maximumPendingMessageLimit |
0 |
用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量。 |
consumer.noLocal |
false |
与Topic消费者中的noLocal标志相同。 暴露在这里,以便它可以与队列一起使用。 |
consumer.prefetchSize |
n/a |
消费者持有的未确认的最大消费数量 |
consumer.priority |
0 |
设置消费者的优先级 |
consumer.retroactive |
false |
是否为回溯消费者 |
consumer.selector |
null |
配置JMS选择器 |
6 Virtual Destination(虚拟目的地)
虚拟destination用来创建逻辑destination,客户端可以通过它来生产和消费消息,它会把消息映射到物理destination。
ActiveMQ支持2种方式:
-
虚拟主题(Virtual Topics)。
-
组合Destinations(Composite Destinations)上文已经介绍过了。
6.1 为何使用虚拟主题
ActiveMQ中,topic只有在持久订阅下才会持久化消息。JMS持久化订阅者 MessageConsumer 在创建时会生成一个 唯一的 JMS clientID 和一个持久化订阅者的name。 为了符合JMS,对于一个JMSclientID,同一时间,只能有一个活跃的JMS连接,并且,对于一个clientID和订阅者name而言,只有一个消费者可以是活跃的。也就是说,对于给定的topic订阅者来说,只能由一个活跃的进程进行消费。这意味着我们不能实现:
- 消息的负载均衡
- 当一个消费者进程死亡时 订阅者的快速故障转移。
如今,JMS中的队列语义提供了通过多个消费者来实现可靠的负载均衡——允许多线程,进程和机器来处理消息。然后我们就有了成熟而复杂的负载均衡技术,就像Messge Groups一样在维护订单时进行负载均衡和并行化处理。如下图所示,全量的消息先发送到队列,然后再分发给消费者,通过队列来解决负载均衡和故障转移问题。
6.2 如何使用虚拟主题
virtual topic背后的思想是生产者用正常的JMS方式把消息发送到一个topic。消费者可以继续使用JMS标准中的Topic语义。然而,如果 topic 是虚拟的, 订阅一个逻辑topic的消费者可以从一个真实队列中消费,允许多个消费者分布在多个机器上且多线程的进行负载均衡。 例如,假定我们现在有一个叫做 VirtualTopic.Orders 的topic。(前缀 VirtualTopic. 表明这是一个 virtual topic)。
Topicdestination = session.createTopic("VirtualTopic.Orders");
我们希望将orders 发送到系统A 和 系统B。于是,用通常的 durable topics ,我们需要为 clientID_A 和 "A" 创建一个JMS消费者,同时为 clientID_B 和 "B"创建一个JMS消费者。 有了virtual topics 我们可以直接从队列 Consumer.A.VirtualTopic.Orders 中为系统A 消费,或者从队列 Consumer.B.VirtualTopic.Orders 中为系统B 消费。
Destination dest = session.createQueue("Consumer.A.VirtualTopic.Orders");
6.3 修改虚拟主题前缀
默认前缀是:VirtualTopic.>
自定义消费虚拟地址默认格式:Consumer..VirtualTopic.>*
下面的示例演示如何使所有主题成为虚拟主题。下面的示例使用名称“>“”表示“匹配所有主题”,“VirtualTopicConsumers”为消费队列的前缀。
<broker xmlns="http://activemq.apache.org/schema/core">
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false" />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
</broker>
其他的配置参数:
选项 | 默认值 | 描述 |
---|---|---|
selectorAwae |
false |
只有与现有订阅者之一匹配的消息才会被实际分发到队列。使用此选项可防止在独占消费者使用选择器时生成不匹配的消息。 |
local |
false |
如果为true,则不要扇出通过网络接收的消息 |
concurrentSend |
false |
如果为true,则使用执行器扇出以使发送并行进行。这允许日志批量写入,从而减少磁盘io(5.12) |
transactedSend |
false |
如果为true,则将事务用于扇出发送,以便有单个磁盘同步。如果没有客户端事务,则将创建本地代理事务(5.13) |
dropOnResourceLimit |
false |
如果为true,则忽略扇出期间抛出的任何ResourceAllocationException(请参阅:sendFailIfNoSpace策略条目)(5.16) |
setOriginalDestination |
true |
如果为true,则转发消息的目的地设置为消费者队列,并且originalDestination消息属性跟随虚拟主题(5.16) |
6.4 broker网络中避免重复消息
通常情况下,使用网络连接消费者队列。在这种情况下,不要在虚拟主题上桥接任何普通主题消费者,因为任何转发的消息都会再次分散到网络代理上的消费者队列,从而导致消息重复。下面示例展示了如何排除虚拟队列:
<networkConnectors> <networkConnector uri="static://([tcp://localhost:61617](tcp://localhost:61617))">
<excludedDestinations>
<queue physicalName="Consumer.*.VirtualTopic.>"/>
</excludedDestinations>
</networkConnector> </networkConnectors>
7 Mirrored Queues(镜像队列)
ActiveMQ中每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。 使用ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。为了启用Mirrored Queues,首先要将BrokerService的“useMirroredQueues“属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省是“VirtualTopic.Mirror.”。如下订阅“*.qmirror”的主题就开启了mirrored queue。
<broker xmlns=http://activemq.apache.org/schema/core useMirroredQueue="true">
<destinationInterceptors>
<mirroredQueue copyMessage="true" postfix=".qmirror" prefix="" />
</destinationInterceptors>
</broker>
网友评论