美文网首页GIS文章集开源GIS+空间数据应用PostgreSQL
基于BottledWater-PG+nodejs实时地图应用实践

基于BottledWater-PG+nodejs实时地图应用实践

作者: 遥想公瑾当年 | 来源:发表于2017-07-07 17:19 被阅读469次

    前文作者讲述了BottledWater-PG安装部署,并在pg中实现了数据改变,向kafka发送消息的案例,详细参考《BottledWater-PG:PostgreSQL集成Kafka的实时数据交换平台》。此前作者写过一篇pg的异步消息实现的实时地图应用案例《postgres+socket.io+nodejs实时地图应用实践》,本文将改用BottledWater-PG实现一遍。

    一 服务器端

    var fs = require('fs');
    var http = require('http');
    var socket = require('socket.io');
    var Kafka = require('node-rdkafka');
        
    var server = http.createServer(function(req, res) {
        
        res.writeHead(200, { 'Content-type': 'text/html'});
        res.end(fs.readFileSync(__dirname + '/index.html'));
        }).listen(8081, function() {
            console.log('Listening at: http://localhost:8081');
    });
    //注册socket.io
    var socketio=socket.listen(server);
    socketio.on('connection', function (socketclient) {
        console.log('已连接socket:');
        //socketclient.broadcast.emit('GPSCoor', data.payload);//广播给别人
        //socketclient.emit('GPSCoor', data.payload);//广播给自己
        
    });
    
    var consumer = new Kafka.KafkaConsumer({
      //'debug': 'all',
      'metadata.broker.list': '192.168.43.27:9092',
      'group.id': 'node-rdkafka-consumer-flow-example',
      'enable.auto.commit': false
    });
    
    var topicName = 'gps';
    
    //logging debug messages, if debug is enabled 
    consumer.on('event.log', function(log) {
      console.log(log);
    });
    
    //打印错误
    consumer.on('error', function(err) {
      console.error('Error from consumer');
      console.error(err);
    });
    
    consumer.on('ready', function(arg) {
      console.log('consumer ready.' + JSON.stringify(arg));
      consumer.subscribe([topicName]);
      //准备消费消息
      consumer.consume();
    });
    
    consumer.on('data', function(m) {
        console.log(m);
        let _data;
        if(m.value==null)//delete操作发送来的消息
        {
            _data=JSON.parse(m.key);
            _data.tg_op='delete';
        }
        else{
            _data=m.value.toString();
            _data=JSON.parse(_data);
        }   
        console.log(_data);
        socketio.emit('GPSCoor', _data);//广播给所有的客户端
    });
    
    consumer.on('disconnected', function(arg) {
      console.log('consumer disconnected. ' + JSON.stringify(arg));
    });
    
    //启动
    consumer.connect();
    

    二 客户端

    <html>
    <head>
        <meta charset='utf-8'>
        <title>实时地图应用</title>
        <link rel="stylesheet" href="http://openlayers.org/en/v3.18.2/css/ol.css" type="text/css">
        <script src="http://openlayers.org/en/v3.18.2/build/ol.js"></script>
        <script src="/socket.io/socket.io.js"></script>
        <script>
                var wktform=new ol.format.WKT();//wkt解析
                var gpsSource=new ol.source.Vector();
                function init(){
                    var gpsLayer=new ol.layer.Vector({
                        source:gpsSource,
                        style:new ol.style.Style({
                            image: new ol.style.Icon(({
                                anchor: [0.5, 1],
                                src: 'http://openlayers.org/en/v3.18.2/examples/data/icon.png'
                            }))
                        })
                    });
                    var map = new ol.Map({
                        layers : [
                            new ol.layer.Tile({
                                title : '街道图',
                                visible : true,
                                source : new ol.source.XYZ({
                                    url : 'http://www.google.cn/maps/vt?pb=!1m5!1m4!1i{z}!2i{x}!3i{y}!4i256!2m3!1e0!2sm!3i342009817!3m9!2szh-CN!3sCN!5e18!12m1!1e47!12m3!1e37!2m1!1ssmartmaps!4e0&token=32965'
                                })
                            }),
                            gpsLayer
                        ],
                        target : 'map',
                        controls : ol.control.defaults({
                            attributionOptions : 
                            ({
                                collapsible : false
                            })
                        }),
                        view : new ol.View({
                            center : [0, 0],
                            zoom : 2
                        })
                    });
                    var iosocket = io.connect();
                    //接受服务端消息
                    iosocket.on('GPSCoor', function(data) {
                        console.log(data);
                        var id=data.id.int;
                        var feature;
                        if(data.tg_op=='delete'){
                            feature=gpsSource.getFeatureById(id);
                            if(feature)
                                gpsSource.removeFeature(feature);//删除点
                        }
                        else{
                            var geom=data.geom.string;
                            geom=wktform.readGeometry(geom);
                            geom.transform('EPSG:4326','EPSG:3857');
                            feature=gpsSource.getFeatureById(id);
                            if(feature)
                                feature.setGeometry(geom);//修改已有点
                            else{
                                feature=new ol.Feature({
                                    geometry:geom
                                });
                                feature.setId(id);
                                gpsSource.addFeature(feature);//地图新增点
                            }
                        }
                    });
                }
                
               
        </script>
    </head>
    <body onload="init()">
        <div id="map"></div>
    </body>
    </html>
    

    三 测试成果

    3.1 新增

    mcsas=# insert into gps(name,geom) values ('opy','Point(118 31.5)');
    INSERT 0 1
    mcsas=# insert into gps(name,geom) values ('ty','Point(117 30.5)');
    INSERT 0 1
    
    新增成果.png

    3.2 修改

    mcsas=# update gps set geom='Point(115 40)' where name='opy';
    UPDATE 1
    
    修改成果.png

    3.3 删除

    mcsas=# delete from gps where name='opy';
    DELETE 1
    
    
    删除结果.png

    四 总结

    BottledWater-PG主要作用是将pg库中的表的增删改的消息都发往了kafka,应用程序并没有直接连接数据库,而是直接去消费kafka的消息。在表发生insert,update,delete能获取消息,但是truncate table并未向kafka生成消息,不知是否是我哪里遗漏。
      作者之前曾使用pg自带的notify与listen实现异步消息发送,该方法借助了表的触发器实现。应用程序是直连数据库且数据增删改都会走触发器。
      匆忙中,作者并未对比两者之间孰优孰劣,但一个直连库,一个间接消费,在不同需求中可选择一个比较符合要求的方案而加以应用。

    相关文章

      网友评论

        本文标题:基于BottledWater-PG+nodejs实时地图应用实践

        本文链接:https://www.haomeiwen.com/subject/wmqrhxtx.html