ActiveMQ插件开发

作者: MisterCH | 来源:发表于2017-04-06 17:03 被阅读189次

    AMQ有开放插件开发接口,今天研究了一下,直接给项目组加了个消息流监控的功能,看起来还是挺好的。

    开发步骤是:
    1.开发插件jar包
    我写的这个插件主要作用是当有消息被发送的时候,就往日志中记录一条。不使用amq自带的logging plugin的主要原因是格式问题,而且我只需要筛选某几个应用的消息。
    注:BrokerFilter这个类中的方法决定了插件能实现的功能,比如连接建立、连接断开、消息到达、消息过期等等都可以自定义操作。
    程序如下

    //入口类
    package com.cn.amqs;
    import org.apache.activemq.broker.Broker;   
    import org.apache.activemq.broker.BrokerPlugin;   
    import org.apache.activemq.plugin.StatisticsBrokerPlugin;
    import org.apache.commons.logging.Log;   
    import org.apache.commons.logging.LogFactory;  
    
    public class MessageLogPlugin implements BrokerPlugin {   
    
      private Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);  
    
      public Broker installPlugin(Broker broker) throws Exception {   
        log.info("install MessageLogPlugin");   
        return new MessageLog(broker);   
      }   
    }  
    
    //主要实现功能的类
    package com.cn.amqs;
    
    import org.apache.activemq.broker.Broker;   
    import org.apache.activemq.broker.BrokerFilter;   
    import org.apache.activemq.broker.ProducerBrokerExchange;
    import org.apache.activemq.command.Message;
    import org.apache.activemq.plugin.StatisticsBrokerPlugin;
    import org.apache.commons.logging.Log;   
    import org.apache.commons.logging.LogFactory;  
    
    //BrokerFilter这个类包含了broker的很多操作,可以看看源码了解一下
    public class MessageLog extends BrokerFilter{
      private Log log;  
    
      public MessageLog(Broker next) {   
        super(next);   
        log = LogFactory.getLog(StatisticsBrokerPlugin.class); 
        log.info("initialize Message Log plugin")
      }  
        
        @Override
        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
    //在这里实现筛选
          if (messageSend.getProperty("appname")!=null && messageSend.getProperty("misid")!=null){
            if (!messageSend.getProperty("appname").equals("OSCM")&&!messageSend.getProperty("appname").equals("OSPM"))
              log.info("[FLOW_LOG] MissionID:  "+messageSend.getProperty("misid")+" Destination: "+messageSend.getDestination());
            }
          super.send(producerExchange, messageSend);
        }
    }  
    

    2.打上面的程序打成jar包后,将jar包放在amq的lib目录下。

    3.修改activemq.xml文件
    在activemq.xml文件中的plugins标签下添加如下的一段代码

    <plugins>
      <bean xmlns="http://www.springframework.org/schema/beans" id="MessageLogPlugin" class="com.icbc.amqs.MessageLogPlugin">
    </plugins>
    

    4.重启MQ,消息流的日志就会显示在data目录下的activemq.log中了

    相关文章

      网友评论

        本文标题:ActiveMQ插件开发

        本文链接:https://www.haomeiwen.com/subject/ihpqattx.html