美文网首页
项目重构记

项目重构记

作者: questionuncle | 来源:发表于2017-08-27 14:36 被阅读0次

在现场施工快一个月,历经前期艰难调试摸索,目前系统已能正常运行,但是有的模块作的很低效,我试图将其改造一下。
现状:
就拿最基本的一个功能点来说,在地图页面上实时显示地理数据(例如:卡点显示实时过车、实时显示车辆GPS位置等),以前我的处理方式是采用页面定时器,间隔一段时间去查询后台数据库,过车数据则通过前置设备采集通过Hessian远程调用将过车数据插入后台数据库,其实现方法简单。但是最大的弊病在于低效和易错,由于前置采集设备采集和执行Hessian,如果网络状态不佳,过车数据将会丢失,同时存入数据库的时间不可能严格真实,但是我的页面上定时器是严格按照时间去查询,容易出现漏查现象,而且频繁的对数据库进行操作,增加了数据库的负担。
改造:
1.引入消息中间件,前置设备作为消息生产者,将采集到的数据装入消息中间件中
2.前端页面通过WebSocket和后台进行连接,当消息中间件中有消息时立即消费,通过WebSocket传给前端页面
改造后结构是这样

结构图.jpg
实现效果:
image.png
如何实现:
1.安装Kafka,Kafka在Windows安装运行,(建议有条件还是上Linux系统)
2.后端通过新建监听线程池和处理线程池来实现对消息中间件的监听和消息的消费,并且利用netty-socketio进行订阅和广播
   @PostConstruct
    public void startConsuming() {
        Configuration config = new Configuration();
        config.setHostname("localhost");
        config.setPort(9093);
        final SocketIOServer server = new SocketIOServer(config);
        server.addEventListener("chatevent", GPSMessage.class, new DataListener<GPSMessage>() {
            @Override
            public void onData(SocketIOClient client, GPSMessage data, AckRequest ackRequest) {
                // broadcast messages to all clients
                server.getBroadcastOperations().sendEvent("chatevent", data);
            }
        });
        server.start();

        listenerExecService = Executors.newFixedThreadPool(LISTENER_NUM_THREADS);
        handlerExecService = Executors.newFixedThreadPool(EXECUTOR_NUM_THREADS);
        consumerConfig = consumerConfigFactory.getConsumerConfig();
        topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, CONSUMER_NUM_THREADS);
        gson = new Gson();

        handlerTaskFactory = (String gpsText) -> new GPSTask(gpsText, gson, server);

        super.start();
    }
public void start() {
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(this.getConsumerConfig());
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(this.getTopicCountMap());
        Iterator var3 = consumerMap.entrySet().iterator();

        while(var3.hasNext()) {
            Entry<String, List<KafkaStream<byte[], byte[]>>> entry = (Entry)var3.next();
            List<KafkaStream<byte[], byte[]>> streams = (List)entry.getValue();
            int threadNumber = 0;

            for(Iterator var7 = streams.iterator(); var7.hasNext(); ++threadNumber) {
                KafkaStream<byte[], byte[]> stream = (KafkaStream)var7.next();
                this.getListenerExecService().submit(new ConsumerListenTask(stream, threadNumber, this.getHandlerTaskFactory((String)entry.getKey()), this.getHandlerExecService()));
            }
        }

    }
public class ConsumerListenTask implements Runnable {
    public void run() {
        ConsumerIterator it = this.kafkaStream.iterator();

        while(it.hasNext()) {
            byte[] messageData = (byte[])it.next().message();
            String reply = new String(messageData);
            this.executorPool.submit((Runnable)this.executorTaskFactory.apply(reply));
            System.out.println("Consumed Thread:" + this.threadNumber + ".Consuming User: " + reply);
        }

        System.out.println("Shutting down Thread: " + this.kafkaStream);
    }
}
public class GPSTask implements Runnable {
    @Override
    public void run() {
        try{
            GPSMessage gpsMessage = gson.fromJson(this.gpsText, GPSMessage.class);
            System.out.println("+++++++++++从kafka中取到车牌号"+gpsMessage.getHphm());
            socketIOServer.getBroadcastOperations().sendEvent("chatevent", gpsMessage);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.前端页面利用socket.io监听事件就行

socket.on('chatevent', function(data) {
        console.log(data);
        switch(data.operate){
          case 'create':
            var coordinate = ol.proj.transform([Number(data.lon), Number(data.lat)], 'EPSG:4326', 'EPSG:3857');
            var point = new ol.geom.Point(coordinate);
            var markerFeature = new ol.Feature({
                name: data.hphm,
                type: 'position',
                geometry: point
            });
            vectorSource.addFeature(markerFeature);
            break;
        }

      });

4.通过kafka自带的messageproducer,注册topic并模拟过车数据,将消息发往消息中间件,后台通过监听线程池来及时消费该消息,最后通过netty-socketio广播到前端页面

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic gpsinfo

目前只做了个Demo,接下来可能会遇到的问题,消息生产和消费不同步,路上过车数据将会是短时内大量过车,会生产大量的过车消息;而页面显示不可能和消息同步,否则一闪而过影响显示效果,但处理不及时将会造成大量消息积压,有待进一步思考尝试。

相关文章

  • 项目重构记

    在现场施工快一个月,历经前期艰难调试摸索,目前系统已能正常运行,但是有的模块作的很低效,我试图将其改造一下。现状:...

  • iOS 基于 MVC 的项目重构总结

    iOS 基于 MVC 的项目重构总结 iOS 基于 MVC 的项目重构总结

  • 视频旋转最终章

    前记 项目中有视频旋转的功能需求,当初重构的时候对视频这块不太熟,直接抄的原来项目中的,原来项目中使用的开源项目R...

  • iOS项目重构周记(一)

    最近开始做公司的iOS项目重构,现准备每周做一次汇总,把重构过程中遇到的问题和解决方案记录下来,做一个记录和分享。...

  • iOS项目重构周记(二)

    继续上一篇,本周的重构重点是UI部分代码的优化。 1. AutoLayout及Masonry AutoLayout...

  • 项目重构

    前言 好孕帮APP截止到现在已经更新过数十个版本,随着需求的增多和功能的变化,原有的架构已经不再适合现有的业务逻辑...

  • 项目重构

    前言 最近又开始看设计模式了,这是我第三次看设计模式了,每一次看都会有不同的感觉,不同的收获,真是不同的时间段去看...

  • Bitmap优化

    最近重构项目时,显示图片的列表特别多,AS捕捉下来的内存,Bitmap使用的内存占比多,所以做BitMap优化,记...

  • Android 架构设计之项目重构【组件化篇】

    项目演示: 1.组件化重构效果 这里先看下我们重构前后的框架图比较: 重构前: 重构后 ft_xxx表示业务层模块...

  • 我的Android重构之旅:插件化篇

    我的Android重构之旅:架构篇我的Android重构之旅:框架篇我的Android重构之旅:插件化篇 随着项目...

网友评论

      本文标题:项目重构记

      本文链接:https://www.haomeiwen.com/subject/kjpbdxtx.html