在现场施工快一个月,历经前期艰难调试摸索,目前系统已能正常运行,但是有的模块作的很低效,我试图将其改造一下。
现状:
就拿最基本的一个功能点来说,在地图页面上实时显示地理数据(例如:卡点显示实时过车、实时显示车辆GPS位置等),以前我的处理方式是采用页面定时器,间隔一段时间去查询后台数据库,过车数据则通过前置设备采集通过Hessian远程调用将过车数据插入后台数据库,其实现方法简单。但是最大的弊病在于低效和易错,由于前置采集设备采集和执行Hessian,如果网络状态不佳,过车数据将会丢失,同时存入数据库的时间不可能严格真实,但是我的页面上定时器是严格按照时间去查询,容易出现漏查现象,而且频繁的对数据库进行操作,增加了数据库的负担。
改造:
1.引入消息中间件,前置设备作为消息生产者,将采集到的数据装入消息中间件中
2.前端页面通过WebSocket和后台进行连接,当消息中间件中有消息时立即消费,通过WebSocket传给前端页面
改造后结构是这样
实现效果:
image.png
如何实现:
1.安装Kafka,Kafka在Windows安装运行,(建议有条件还是上Linux系统)
2.后端通过新建监听线程池和处理线程池来实现对消息中间件的监听和消息的消费,并且利用netty-socketio进行订阅和广播
@PostConstruct
public void startConsuming() {
Configuration config = new Configuration();
config.setHostname("localhost");
config.setPort(9093);
final SocketIOServer server = new SocketIOServer(config);
server.addEventListener("chatevent", GPSMessage.class, new DataListener<GPSMessage>() {
@Override
public void onData(SocketIOClient client, GPSMessage data, AckRequest ackRequest) {
// broadcast messages to all clients
server.getBroadcastOperations().sendEvent("chatevent", data);
}
});
server.start();
listenerExecService = Executors.newFixedThreadPool(LISTENER_NUM_THREADS);
handlerExecService = Executors.newFixedThreadPool(EXECUTOR_NUM_THREADS);
consumerConfig = consumerConfigFactory.getConsumerConfig();
topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, CONSUMER_NUM_THREADS);
gson = new Gson();
handlerTaskFactory = (String gpsText) -> new GPSTask(gpsText, gson, server);
super.start();
}
public void start() {
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(this.getConsumerConfig());
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(this.getTopicCountMap());
Iterator var3 = consumerMap.entrySet().iterator();
while(var3.hasNext()) {
Entry<String, List<KafkaStream<byte[], byte[]>>> entry = (Entry)var3.next();
List<KafkaStream<byte[], byte[]>> streams = (List)entry.getValue();
int threadNumber = 0;
for(Iterator var7 = streams.iterator(); var7.hasNext(); ++threadNumber) {
KafkaStream<byte[], byte[]> stream = (KafkaStream)var7.next();
this.getListenerExecService().submit(new ConsumerListenTask(stream, threadNumber, this.getHandlerTaskFactory((String)entry.getKey()), this.getHandlerExecService()));
}
}
}
public class ConsumerListenTask implements Runnable {
public void run() {
ConsumerIterator it = this.kafkaStream.iterator();
while(it.hasNext()) {
byte[] messageData = (byte[])it.next().message();
String reply = new String(messageData);
this.executorPool.submit((Runnable)this.executorTaskFactory.apply(reply));
System.out.println("Consumed Thread:" + this.threadNumber + ".Consuming User: " + reply);
}
System.out.println("Shutting down Thread: " + this.kafkaStream);
}
}
public class GPSTask implements Runnable {
@Override
public void run() {
try{
GPSMessage gpsMessage = gson.fromJson(this.gpsText, GPSMessage.class);
System.out.println("+++++++++++从kafka中取到车牌号"+gpsMessage.getHphm());
socketIOServer.getBroadcastOperations().sendEvent("chatevent", gpsMessage);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.前端页面利用socket.io监听事件就行
socket.on('chatevent', function(data) {
console.log(data);
switch(data.operate){
case 'create':
var coordinate = ol.proj.transform([Number(data.lon), Number(data.lat)], 'EPSG:4326', 'EPSG:3857');
var point = new ol.geom.Point(coordinate);
var markerFeature = new ol.Feature({
name: data.hphm,
type: 'position',
geometry: point
});
vectorSource.addFeature(markerFeature);
break;
}
});
4.通过kafka自带的messageproducer,注册topic并模拟过车数据,将消息发往消息中间件,后台通过监听线程池来及时消费该消息,最后通过netty-socketio广播到前端页面
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic gpsinfo
目前只做了个Demo,接下来可能会遇到的问题,消息生产和消费不同步,路上过车数据将会是短时内大量过车,会生产大量的过车消息;而页面显示不可能和消息同步,否则一闪而过影响显示效果,但处理不及时将会造成大量消息积压,有待进一步思考尝试。
网友评论