项目里遇到需要在ActiveMQ上记录任务日志的场景。其实AMQ本身自带一个LoggingPlugin,但是用起来总是不切合项目的实际场景。思来想去,正好前段时间为其他项目做了个MQTT协议认证的插件,技术基础已经有了,还是自己给项目写一个定制化版的插件吧。
在我之前的文章ActiveMQ插件开发里介绍了如何开发一个AMQ的插件。其实这次的功能就是基于之前的代码里的内容进行修改的。主要功能是每次AMQ接收到一个任务消息后,就往一台服务器上使用HTTP POST方法发送一条消息。表示任务流经MQ。
先来看入口类,相比之前的代码,增加了两个参数,这两个参数可以在配置文件activemq.xml中实现手动配置。
package com.cn.amqs;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageLogPlugin implements BrokerPlugin {
private Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);
private String seviceUrl;
private String sign;
public Broker installPlugin(Broker broker) throws Exception {
log.info("install MessageLogPlugin");
return new MessageLog(broker,serviceUrl,sign);
}
public void setServiceUrl(String serviceUrl) {
this.serviceUrl=serviceUrl;
}
……
}
主要功能在MessageLog类中,实现了几个功能:
- 每来一个任务消息,判断消息是否曾经来过,如果是第一次收到,则发送一条post消息到服务器上
- 记录一个任务的消息数量
- 为了防止任务数量无限增长,设置了定时清理机制(但是由于每个任务都设置了Timer,适用的场景应该是任务较少或者任务可清理时间较短的场景,否则也是对资源的消耗)
- 区分任务上行还是任务下行
package com.cn.amqs;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* 实现每次任务到达MQ时自动往一个地址上送一条信息
* @author MiSterRabbit
*/
public class MessageLog extends BrokerFilter{
private Log log;
/**下行任务HashMap*/
private ConcurrentHashMap<Object, Integer> downWards;
/**上行任务HashMap*/
private ConcurrentHashMap<Object, Integer> upWards;
private String seviceUrl;
private String sign;
public MessageLog(Broker next,String seviceUrl,String sign) {
super(next);
downWards = new ConcurrentHashMap<Object, Integer>();
upWards = new ConcurrentHashMap<Object, Integer>();
this.seviceUrl=seviceUrl;
this.sign=sign.isEmpty()? "分部":"总部";
log = LogFactory.getLog(com.cn.amqs.MessageLog.class);
log.info("initialize Message Log plugin");
}
/**
* Timer类,实现定时清理日志HashMap,防止Map的无限增长
*/
class missionTimer extends TimerTask {
private String missionID;
private Log log;
private ConcurrentHashMap<Object, Integer> map;
public missionTimer(String missionID, Log log, ConcurrentHashMap<Object, Integer> map) {
this.missionID=missionID;
this.log=log;
this.map=map;
}
@Override
public void run() {
this.map.remove(missionID);
this.log.info("[FLOW_LOG] Remove expired mission: "+missionID);
}
}
/**
* 判断日志是否在map中,如果不在,则发送一条消息,若存在,则增加计数器
* @param missionID 任务号
* @param map 任务下发和任务上送使用不同的map
*/
public synchronized void insertIntoMap(String missionID, ConcurrentHashMap<Object,Integer> map,String direction) {
if(map.containsKey(missionID)) {
int count = map.get(missionID)+1;
map.put(missionID,count);
this.log.debug("[FLOW_LOG] "+map);
} else{
map.put(missionID,1);
this.log.info("[FLOW_LOG] Receive a new "+direction+" mission: "+missionID);
// 开启一个线程发送一条任务数据,这里的MissionSend类其实就是开启一个线程发送一条http post消息
if (direction.equalsIgnoreCase("DOWNWARD")){
MissionSend tmqs = new MissionSend(missionID, super.getBrokerName().toString().substring(3), "ActiveMQ", "/opt/activemq/apache-activemq-5.13.4/data/mission.log", this.sign+"MQ收到下行任务", "ok", this.seviceUrl);
new Thread(tmqs,"mission_send").start();
} else {
MissionSend tmqs = new MissionSend(missionID, super.getBrokerName().toString().substring(3), "ActiveMQ", "/opt/activemq/apache-activemq-5.13.4/data/mission.log", this.sign+"MQ收到上行任务", "ok", this.seviceUrl);
new Thread(tmqs,"mission_send").start();
}
// 使用Timer定时清理,1800秒后清理这个任务
Timer timer =new Timer();
TimerTask task = new missionTimer(missionID,this.log,downWards);
timer.schedule(task,1800000);
}
}
/**
* 每当MQ收到一条生产者发送过来的消息的时候执行判断。
*/
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
// 如果任务ID不为空,且不是经由集群内部发过来的消息
if ((messageSend.getProperty("misid") != null) &&
(!producerExchange.getProducerState().getInfo().getProducerId().toString().contains("MQ_"))) {
// 如果目的地不包含UPLAOD字段,则判断为消息下行,否则为消息上行。记录一个日志,然后调用insertIntoMap判断是否需要发送http post消息
if (!messageSend.getDestination().toString().toLowerCase().contains("upload")) {
this.log.info("[FLOW_LOG] Down Mission: " + messageSend.getProperty("misid") + ". Destination: " + messageSend.getDestination() + ". Producer: "+producerExchange.getConnectionContext().getConnection().getRemoteAddress());
insertIntoMap(messageSend.getProperty("misid").toString(),downWards,"DOWNWARD");
} else {
this.log.info("[FLOW_LOG] Up Mission: " + messageSend.getProperty("misid") + ". Destination: " + messageSend.getDestination()+". Producer: "+producerExchange.getConnectionContext().getConnection().getRemoteAddress());
insertIntoMap(messageSend.getProperty("misid").toString(),upWards,"UPWARD");
}
}
super.send(producerExchange, messageSend);
}
}
MissionSend的类可以自由扩展。我就不赘述了。
插件功能为AMQ带来了极强的扩展性,用户可以实现在不对现有功能进行修改的前提下进行功能的二次开发。有空我会整理一个插件可以实现的功能清单。其实如果有空,看看BrokerFilter这个类,就能明白插件能实现的功能了。
网友评论