美文网首页
视图库过车数据模拟工具设计(下)

视图库过车数据模拟工具设计(下)

作者: 简单是美美 | 来源:发表于2021-08-19 16:52 被阅读0次

    4. 关键算法

      生成行车轨迹的算法描述:对于每个过车agent,会随机选择一个车牌,随机选择一个卡口作为起始卡口,生成一个行车轨迹。在所行驶的坐标范围内,在某个卡口点位可选择的行驶方向如下图所示:


    图4

      由图4可知,在不同的坐标可选择的行驶方向也有不同,可选择4个方向,也可能选择3个方向,也可能选择2个方向。行车轨迹生成算法需要根据当前卡口坐标,在候选方向集合中随机选择一个方向来模拟行车。
      对于每个行车轨迹的长度,即经过的卡口点的数量,可随机生成,建议最大距离为行驶范围矩形对角线的长度。
      行车轨迹模拟算法如下所示:

    名称:模拟行车轨迹
    输入参数:无
    输出参数:无
    过程:
     //随机选择一个卡口
    startTollgate = getRandomTollgate(); 
    //随机选择一个车牌并锁定该车牌
    plate = getRandomPlate();
    //将起始过车写入过车数据流
    motorvehicle = createMotorvehicle(startTollgate, plate);
    writeMotorvehicleDatastream(motorvehicle);
    //随机生成行驶长度
    distance = getRandomDistance();
    //随机生成行驶速度,在20~80中选个随机数
    speed = getRandomSpeed();
    //计算模拟的每条过车记录,并写入过车数据流中
    currentTollgate = startTollgate;
    For(int i=0; i< distance; i++){
      //根据当前卡口能走的方向计算出随机行驶方向
      direction = simuRandomDirection(currentTollgate);
      //根据速度计算出经过下一卡口的过车时间
      plate.lasttime = calcPassTime(speed);
      //根据当前卡口和行驶方向计算出下一卡口
      nextTollgate = calcNextTollgate(currentTollgate, direction);
      //将模拟的过车写入过车数据流
      motorvehicle = createMotorvehicle(nextTollgate, plate);
      writeMotorvehicleDatastream(motorvehicle);
      //进入下一次循环
      currentTollgate = nextTollgate
    }
    

    5. 运行方式

      目前我们的视图数据存储使用hdfs和hive,建表工具使用iceberg。因此我们写了一个flink job来写入到iceberg的过车表中。flink job中自定义生成过车数据的source,过车写入datastream中,最后写入为iceberg表的sink中。过车agent的source代码如下:

    /**
     * Title: CommonMTAgent.java
     * Description: 通用过车agent
     * author zhang.kai  
     * Copyright: Copyright (c) 2025
     * Company: www.kedacom.com 
     * date: 2021年7月21日-下午2:37:32
     * version: V1.0
     */
    package com.kedacom.simumotor.agent;
    
    import java.io.Serializable;
    import java.util.Date;
    
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
    import org.apache.flink.table.data.GenericRowData;
    import org.apache.flink.table.data.RowData;
    import com.kedacom.simumotor.config.SimuConfig;
    import com.kedacom.simumotor.pojo.Motorvehicle;
    import com.kedacom.simumotor.pojo.Plate;
    import com.kedacom.simumotor.pojo.SimuTollgate;
    import com.kedacom.simumotor.pool.PoolManager;
    import com.kedacom.simumotor.util.MotorVehicleUtils;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class CommonMTAgent implements Runnable, Serializable {
    
        /**
         * 
         */
        private static final long serialVersionUID = -183630777130253681L;
    
        private SourceContext<RowData> sc;
    
        public String name;
    
        public CommonMTAgent() {
    
        }
    
        public CommonMTAgent(SourceContext<RowData> ctx) {
            sc = ctx;
        }
    
        public void start() {
            startSimuData();
        }
    
        @Override
        public void run() {
            log.info("************** plateLastAppearTime:{}",new Date(SimuConfig.plateLastAppearTime).toString());
            long count = 0;
            while (!SimuConfig.ifstop) {
    
                try {
                    // 获取随机卡口和车牌
                    SimuTollgate tollgate = PoolManager.getRandomTollgate();
                    Plate plate = PoolManager.getRandomPlate();
                    // 生成行车轨迹
                    synchronized (plate) {
                        if (plate.getPlateLastTime() < System.currentTimeMillis()) {
                            plate.setPlateLastTime(System.currentTimeMillis());
                        }
                        // 在上次出现的时间上加上间隔
                        plate.setPlateLastTime(plate.getPlateLastTime() + SimuConfig.APPEAR_DELAY * 1000);
                        writeMotorvehicle2Stream(tollgate, plate);
                        count++;
                        // 生成随机轨迹
                        int pathLength = MotorVehicleUtils.getRandomPathLength();
    
                        for (int i = 0; i < pathLength; i++) {
                            tollgate = MotorVehicleUtils.getNextTollgate(tollgate);
                            if (null == tollgate) {
                                break;
                            }
    
                            MotorVehicleUtils.setNextAppearTime(plate);
                            writeMotorvehicle2Stream(tollgate, plate);
                            count++;
                        }
    
                    }
                    // 到达批量写入,进行计数
                    if (count >= SimuConfig.BATCH_WRITE_SIZE) {
                        log.info("batchWriteMv:{}", count);
                        // Thread.sleep(10000);
                        count = 0;
                    }
                } catch (Exception e) {
                    log.error("XXXXXX CommonMTAgent run error", e);
                }
            }
        }
    
        /**
         * 启动模拟数据线程
         */
        private void startSimuData() {
            Thread agentThread = new Thread(this);
            agentThread.setName("SimuThread-" + name);
            agentThread.start();
        }
    
        /**
         * 将生成的motorvehicle写入数据流中
         * 
         * @param tollgate
         * @param plate
         * @param metricqueue
         */
        private void writeMotorvehicle2Stream(SimuTollgate tollgate, Plate plate) {
            Motorvehicle mv = MotorVehicleUtils.getMotorvehicle(tollgate, plate);
            RowData row = transformMv(mv);
            sc.collect(row);
        }
    
        /**
         * 将motorvehicle对象转换为
         * 
         * @param mv
         * @return
         */
        private RowData transformMv(Motorvehicle mv) {
            GenericRowData rowData = new GenericRowData(16);
            long passtime = mv.getPassTime() / 1000;
            int days = (int) (passtime / (60 * 60 * 24));
            rowData.setField(0, days);
            rowData.setField(1, MotorVehicleUtils.wrap(mv.getMotorVehicleID()));
            rowData.setField(2, mv.getInfoKind());
            rowData.setField(3, MotorVehicleUtils.wrap(mv.getTollgateId()));
            rowData.setField(4, passtime);
            rowData.setField(5, MotorVehicleUtils.wrap(mv.getDeviceId()));
            rowData.setField(6, MotorVehicleUtils.wrap(mv.getStorageUrl1()));
            rowData.setField(7, mv.getLeftTopX());
            rowData.setField(8, mv.getLeftTopY());
            rowData.setField(9, mv.getRightBtmX());
            rowData.setField(10, mv.getRightBtmY());
            rowData.setField(11, MotorVehicleUtils.wrap(mv.getHasPlate()));
            rowData.setField(12, MotorVehicleUtils.wrap(mv.getPlateClass()));
            rowData.setField(13, MotorVehicleUtils.wrap(mv.getPlateColor()));
            rowData.setField(14, MotorVehicleUtils.wrap(mv.getPlateNo()));
            rowData.setField(15, MotorVehicleUtils.wrap(mv.getVehicleColor()));
            return rowData;
        }
    
    }
    

      经过测试,在我们自己的flink集群上,每小时能够写入约2亿条过车数据,一天大约能够写入近50亿过车数据。

    相关文章

      网友评论

          本文标题:视图库过车数据模拟工具设计(下)

          本文链接:https://www.haomeiwen.com/subject/nwlabltx.html