如果要在amq上使用mqtt协议,有一个很大的问题是mqtt协议无法使用amq的消息选择器功能,这样就会造成一个很麻烦的情况:
- 如果要用mqtt协议点对点发送消息,那么就需要为每个mqtt连接单独建立一个queue或者topic,可amq对大量destination的支持并不好,测试中发现无法支撑超过9000个queue。也就是说如果要用amq实现mqtt连接的点对点发送,连接数就不能超过9000个。
在网上找了一圈,只有一篇类似的解决方法,是大神kimmking写的:
使用ActiveMQ+MQTT实现Android点对点消息通知
但是这个方法太过麻烦,而且随着mq版本升级,需要每次都改一下broker的代码。
趁着今晚有空,根据之前的思路研究了一下这个问题的解决办法。
消息选择器的功能有两个要素:
- 消息生产者在消息属性里带上筛选的字段
- 消费者在建立消费者的时候,注明消息选择器
难点是mqtt协议既不支持消息属性,也不支持带上选择器建立消费者。
我的项目里由于发送方可以不用mqtt协议,所以我主要考虑解决第二个难点,其实第一个难点的解决方法也类似,也可以通过插件实现。
第二个难点的解决方法是通过编写插件,在broker执行addConsumer的时候,为每一个consumer自动加上一个ClientID='客户端clientID'这样的消息选择器。插件的编写方法见我上一篇文章
ActiveMQ插件开发
代码如下:
package com.icbc.amqs;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MqttSelectorPlugin implements BrokerPlugin {
private Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);
public Broker installPlugin(Broker broker) throws Exception {
log.info("install MqttSelectorPlugin ");
return new MqttSelector(broker);
}
}
package com.icbc.amqs;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MqttSelector extends BrokerFilter{
private Log log;
public MqttSelector(Broker next) {
super(next);
log = LogFactory.getLog(StatisticsBrokerPlugin.class);
log.info("initialize Message Log plugin");
}
//需要注意的是mqtt协议建立消费者的时候。consumerInfo里不会带上clientID,只能从ConnectionContext中取。
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (info.getDestination().toString().contains("test")){
info.setSelector("ClientID='"+context.getClientId()+"'");
log.info("[Consumer] Adding consumer : "+ info);
}
return super.addConsumer(context, info);
}
}
经测试,可以让mqtt协议的消费者正常接收消息。
网友评论