美文网首页程序员
ActiceMQ实现mqtt协议的消息选择器功能

ActiceMQ实现mqtt协议的消息选择器功能

作者: MisterCH | 来源:发表于2017-04-08 22:49 被阅读651次

    如果要在amq上使用mqtt协议,有一个很大的问题是mqtt协议无法使用amq的消息选择器功能,这样就会造成一个很麻烦的情况:

    • 如果要用mqtt协议点对点发送消息,那么就需要为每个mqtt连接单独建立一个queue或者topic,可amq对大量destination的支持并不好,测试中发现无法支撑超过9000个queue。也就是说如果要用amq实现mqtt连接的点对点发送,连接数就不能超过9000个。

    在网上找了一圈,只有一篇类似的解决方法,是大神kimmking写的:
    使用ActiveMQ+MQTT实现Android点对点消息通知

    但是这个方法太过麻烦,而且随着mq版本升级,需要每次都改一下broker的代码。

    趁着今晚有空,根据之前的思路研究了一下这个问题的解决办法。

    消息选择器的功能有两个要素:

    • 消息生产者在消息属性里带上筛选的字段
    • 消费者在建立消费者的时候,注明消息选择器

    难点是mqtt协议既不支持消息属性,也不支持带上选择器建立消费者。

    我的项目里由于发送方可以不用mqtt协议,所以我主要考虑解决第二个难点,其实第一个难点的解决方法也类似,也可以通过插件实现。

    第二个难点的解决方法是通过编写插件,在broker执行addConsumer的时候,为每一个consumer自动加上一个ClientID='客户端clientID'这样的消息选择器。插件的编写方法见我上一篇文章
    ActiveMQ插件开发

    代码如下:

    package com.icbc.amqs;
    import org.apache.activemq.broker.Broker;   
    import org.apache.activemq.broker.BrokerPlugin;   
    import org.apache.activemq.plugin.StatisticsBrokerPlugin;
    import org.apache.commons.logging.Log;   
    import org.apache.commons.logging.LogFactory;  
    
    public class MqttSelectorPlugin implements BrokerPlugin {   
    
    private Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);  
    
    public Broker installPlugin(Broker broker) throws Exception {   
    log.info("install MqttSelectorPlugin ");   
    return new MqttSelector(broker);   
    }   
    }  
    
    package com.icbc.amqs;
    import org.apache.activemq.broker.Broker;   
    import org.apache.activemq.broker.BrokerFilter;   
    import org.apache.activemq.broker.ConnectionContext;
    import org.apache.activemq.broker.region.Subscription;
    import org.apache.activemq.command.ConsumerInfo;
    import org.apache.activemq.plugin.StatisticsBrokerPlugin;
    import org.apache.commons.logging.Log;   
    import org.apache.commons.logging.LogFactory;  
    
    public class MqttSelector extends BrokerFilter{
    private Log log;  
    
    public MqttSelector(Broker next) {   
    super(next);   
    log = LogFactory.getLog(StatisticsBrokerPlugin.class); 
    log.info("initialize Message Log plugin");
    }  
        
    //需要注意的是mqtt协议建立消费者的时候。consumerInfo里不会带上clientID,只能从ConnectionContext中取。
        @Override
        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
        if (info.getDestination().toString().contains("test")){
        info.setSelector("ClientID='"+context.getClientId()+"'");
        log.info("[Consumer] Adding consumer : "+ info);
        }     
            return super.addConsumer(context, info);
        }
    
    }  
    

    经测试,可以让mqtt协议的消费者正常接收消息。

    相关文章

      网友评论

        本文标题:ActiceMQ实现mqtt协议的消息选择器功能

        本文链接:https://www.haomeiwen.com/subject/wjprattx.html