美文网首页GIS后端
How it works(10) NodeODM源码阅读(A)

How it works(10) NodeODM源码阅读(A)

作者: 默而识之者 | 来源:发表于2019-03-15 21:46 被阅读2次

    引入

    OpenDroneMap(ODM)是一款非常强大的无人机成果处理软件,可以直接将无人机拍摄的照片处理成正摄影像甚至进行三维建模.ODM本身是基于python的OpenSFM编写的命令行工具,为了方便实际使用,NodeODM出现了.
    NodeODM是Nodejs编写的一套带有可视化界面的API,实现了通过接口上传图片,修改配置,获取进度等常用功能.因此我们一般用的都是NodeODM,很少会直接调用ODM.
    NodeODM对外是将命令行封装为接口,内部主要实现了以下的功能:

    • 任务排队与分配
    • 简单的鉴权
    • 任务执行后处理

    代码架构

    惯例是madge生成的结构图

    文件架构

    因为NodeODM是一个前后端一体项目,文件组织比较复杂,也需要说一下:


    从架构图可知,整个NodeODM围绕一大一下两个功能展开:

    • 鉴权相关
    • 任务相关

    鉴权

    NodeODM没有用户体系,因此采用的是静态token鉴权.因为整个NodeODM是基于express框架搭建的接口,所以所谓的鉴权,就是在请求中最先处理的中间件:

    const auth = require('./libs/auth/factory').fromConfig(config);
    const authCheck = auth.getMiddleware();
    app.get('/task/:uuid/info', authCheck, getTaskFromUuid, (req, res) => {
        res.json(req.task.getInfo());
    });
    

    鉴权中间件采用工厂模式:

    const NoTokenRequiredAuth = require('./NoTokenRequiredAuth');
    const SimpleTokenAuth = require('./SimpleTokenAuth');
    
    module.exports = {
        fromConfig: function(config){
            if (config.token){
                return new SimpleTokenAuth(config.token);
            }else{
                return new NoTokenRequiredAuth();
            }
        }
    }
    

    鉴权模型有两种:

    • 无鉴权
    • 简单鉴权

    这两者都基于一个抽象的鉴权基类:

    module.exports = class TokenAuthBase{
        initialize(cb){
            cb();
        }
        cleanup(cb){
            cb();
        }
        //虚函数,需要重写
        validateToken(token, cb){ cb(new Error("Not implemented"), false); }
        //生成中间件
        getMiddleware(){
            return (req, res, next) => {
                this.validateToken(req.query.token, (err, valid) => {
                    if (valid) next();
                    else{
                        res.json({ error: "Invalid authentication token: " + err.message });
                    }
                });
            };
        }
    };
    

    两种鉴权模式继承了该基类

    //简单鉴权
    module.exports = class SimpleTokenAuth extends TokenAuthBase{
        constructor(token){
            super(token);
            this.token = token;
        }
    
        validateToken(token, cb){ 
            if (this.token === token){
                return cb(null, true);
            }else{
                cb(new Error("token does not match."), false);
            }
        }
    };
    
    //无鉴权
    module.exports = class NoTokenRequiredAuth extends TokenAuthBase{
        // 永远返回true
        validateToken(token, cb){ cb(null, true); }
    };
    

    虽然这是一种聊胜于无的鉴权方式,但它定义了一个很好的模式:如果有第三种复杂/动态的鉴权方式,采用工厂模式可以保证对上层的影响最小.继承基类的方式能保证对整个鉴权模块的影响最小.

    任务

    相比可有可无的鉴权,任务是整个NodeODM最重要的部分.任务相关由三大部分组成:


    任务的运行流程也与这三部分都有关系:


    任务运行前

    由上面的流程图可知,任务的创建分4个小步骤:

    • 构建环境
    • 初始化配置
    • 存储图片
    • 任务开始运行

    构建环境

    初始化环境发生在NodeODM启动之时,只会初始化一次,主要目的是准备相应的文件夹,清理无用的旧任务文件夹等:

    constructor(done){
        this.tasks = {};
        //正在运行的任务
        this.runningQueue = [];
        //顺序执行下列清理
        async.series([
            cb => this.restoreTaskListFromDump(cb),
            cb => this.removeOldTasks(cb),
            cb => this.removeOrphanedDirectories(cb),
            cb => this.removeStaleUploads(cb),// 移除没有对应任务的文件夹
            cb => {
                this.processNextTask();//自动开始状态为排队的任务
                cb();
            },
            cb => {
                // 建立定时任务,每小时都进行清除,
                schedule.scheduleJob('0 * * * *', () => {
                    this.removeOldTasks();
                    this.dumpTaskList();//将配置文件存储在硬盘上
                    this.removeStaleUploads();
                });
                cb();
            }
        ], done);
    }
    
    //加载全部已存在的任务
    restoreTaskListFromDump(done){
        fs.readFile(TASKS_DUMP_FILE, (err, data) => {
            if (!err){
                let tasks;
                try{
                    tasks = JSON.parse(data.toString());
                }catch(e){
                    done(new Error(`It looks like the ${TASKS_DUMP_FILE} is corrupted (${e.message}).`));
                    return;
                }
          //重新序列化每一个任务
                async.each(tasks, (taskJson, done) => {
                    Task.CreateFromSerialized(taskJson, (err, task) => {
                        if (err) done(err);
                        else{
                          //将任务添加回任务列表
                            this.tasks[task.uuid] = task;
                            done();
                        }
                    });
                }, err => {
                    logger.info(`Initialized ${tasks.length} tasks`);
                    if (done !== undefined) done();
                });
            }else{
                logger.info("No tasks dump found");
                if (done !== undefined) done();
            }
        });
    }
    
    //移除过早的任务
    removeOldTasks(done){
        let list = [];
        let now = new Date().getTime();
        logger.debug("Checking for old tasks to be removed...");
    
        for (let uuid in this.tasks){
            let task = this.tasks[uuid];
        //任务创建时间
            let dateFinished = task.dateCreated;
            if (task.processingTime > 0) dateFinished += task.processingTime;
        //收集所有过早的已失败已完成的已或取消的任务(允许存在正在运行或正在排队的过早任务)
            if ([statusCodes.FAILED,
                statusCodes.COMPLETED,
                statusCodes.CANCELED].indexOf(task.status.code) !== -1 &&
                now - dateFinished > CLEANUP_TASKS_IF_OLDER_THAN){
                list.push(task.uuid);
            }
        }
      //为什么不在上步直接remove?因为remove是异步的,上面的是同步方法,调用的话无法保证已完成
        async.eachSeries(list, (uuid, cb) => {
            logger.info(`Cleaning up old task ${uuid}`);
            this.remove(uuid, cb);
        }, done);
    }
    

    初始化配置

    从接口定义可以看出,初始化配置总共经历了简单鉴权,配置UUID到初始化任务:

    app.post('/task/new/init', authCheck, taskNew.assignUUID, taskNew.handleInit);
    

    UUID是追踪每一个任务的唯一标识,生成了UUID,就可以安心的构建该任务的不重名工作区文件夹:

    assignUUID: (req, res, next) => {
        //用户可以自行指定uuid
        if (req.get('set-uuid')){
            const userUuid = req.get('set-uuid');
            // 验证用户给定的UUID是否合法并且没有重名
            if (/^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(userUuid) && !TaskManager.singleton().find(userUuid)){
                req.id = userUuid;
                next();
            }else{
                res.json({error: `Invalid set-uuid: ${userUuid}`})
            }
        }else{
            //生成一个UUID
            req.id = uuidv4();
            next();
        }
    }
    handleInit: (req, res) => {
            req.body = req.body || {};
            //在临时文件夹下生成工作区
            const srcPath = path.join("tmp", req.id);
            const bodyFile = path.join(srcPath, "body.json");
            // 出错的时候删掉工作区
            const die = (error) => {
                res.json({error});
                removeDirectory(srcPath);
            };
        //顺序执行下列任务
            async.series([
                cb => {
                    // 检测指定的配置信息是否合法
                    if (req.body && req.body.options){
                        odmInfo.filterOptions(req.body.options, err => {
                            if (err) cb(err);
                            else cb();
                        });
                    }else cb();
                },
                cb => {
                  //检测要生成的文件夹是否不存在
                    fs.stat(srcPath, (err, stat) => {
                        if (err && err.code === 'ENOENT') cb();
                        else cb(new Error(`Directory exists (should not have happened: ${err.code})`));
                    });
                },
                //创建该文件夹
                cb => fs.mkdir(srcPath, undefined, cb),
                //将配置序列化,也存入该文件
                cb => {
                    fs.writeFile(bodyFile, JSON.stringify(req.body), {encoding: 'utf8'}, cb);
                },
                //返回生成的UUID供客户端后面使用
                cb => {
                    res.json({uuid: req.id});
                    cb();
                }
            ], err => {
                if (err) die(err.message);
            });
       },
    

    存储图片

    从接口定义可以看出,存储图片总共经历了简单鉴权,验证UUID,图片上传,返回结果几步:

    app.post('/task/new/upload/:uuid', authCheck, taskNew.getUUID, taskNew.uploadImages, taskNew.handleUpload);
    

    上传图片前,首先验证的是UUID是否合法,确定要向哪个任务上传图片:

    getUUID: (req, res, next) => {
        req.id = req.params.uuid;
        if (!req.id) res.json({error: `Invalid uuid (not set)`});
        //验证是否存在uuid对应的文件夹下有配置文件存在
        const srcPath = path.join("tmp", req.id);
        const bodyFile = path.join(srcPath, "body.json");
        fs.access(bodyFile, fs.F_OK, err => {
            if (err) res.json({error: `Invalid uuid (not found)`});
            else next();
        });
    },
    

    上传图片的处理采用了express官方的multer模块:

    //接收表单中的"images"字段上传的图片
    uploadImages: upload.array("images"),
    const upload = multer({
        storage: multer.diskStorage({
            destination: (req, file, cb) => {
                let dstPath = path.join("tmp", req.id);
                fs.exists(dstPath, exists => {
                    if (!exists) {
                        fs.mkdir(dstPath, undefined, () => {
                            cb(null, dstPath);
                        });
                    } else {
                        cb(null, dstPath);
                    }
                });
            },
            filename: (req, file, cb) => {
                let filename = utils.sanitize(file.originalname);
                //因为body.json是内置的配置存储文件,避免在此被替换掉
                if (filename === "body.json") filename = "_body.json";
                cb(null, filename);
            }
        })
    });
    

    任务开始运行

    进入任务最重要的环节:执行.从接口定义可以看出,任务执行总共经历了简单鉴权,验证UUID,执行前处理,任务执行几步:

    app.post('/task/new/commit/:uuid', authCheck, taskNew.getUUID, taskNew.handleCommit, taskNew.createTask);
    

    在执行任务前,需要为执行准备参数,即待处理的任务文件和任务参数.从代码可以看出,NodeODM的参数都会固化在本地文件内,用的时候序列化,修改后再存入本地,而不是一直维持在内存里:

    handleCommit: (req, res, next) => {
            const srcPath = path.join("tmp", req.id);
            const bodyFile = path.join(srcPath, "body.json");
            //顺序执行下列动作
            async.series([
                //先读取配置文件(文件夹下有全部图片和配置文件)
                cb => {
                    fs.readFile(bodyFile, 'utf8', (err, data) => {
                        if (err) cb(err);
                        else {
                            try {
                                //序列化配置文件后删除它
                                const body = JSON.parse(data);
                                fs.unlink(bodyFile, err => {
                                    if (err) cb(err);
                                    else cb(null, body);
                                });
                            } catch (e) {
                                cb("Malformed body.json");
                            }
                        }
                    });
                },
                //读取所有图片(现在只剩图片了)
                cb => fs.readdir(srcPath, cb),
            ], (err, [body, files]) => {
                if (err) res.json({
                    error: err.message
                });
                else {
                    //将配置和文件列表挂载到request对象上,供后面的中间件使用
                    req.body = body;
                    req.files = files;
                    if (req.files.length === 0) {
                        req.error = "Need at least 1 file.";
                    }
                    next();
                }
            });
        },
    

    一切准备妥当,就可以开启任务了:

    createTask: (req, res) => {
        req.setTimeout(1000 * 60 * 20);
        const srcPath = path.join("tmp", req.id);
        // 遇到致命错误时,就返回错误信息,并尝试删除文件
        const die = (error) => {
            res.json({error});
            removeDirectory(srcPath);
        };
        if (req.error !== undefined){
            die(req.error);
        }else{
          //正式存储成果的文件夹
            let destPath = path.join(Directories.data, req.id);
            let destImagesPath = path.join(destPath, "images");
            let destGcpPath = path.join(destPath, "gcp");
            async.series([
              //验证配置参数是否合法
                cb => {
                    odmInfo.filterOptions(req.body.options, (err, options) => {
                        if (err) cb(err);
                        else {
                            req.body.options = options;
                            cb(null);
                        }
                    });
                },
                //保证最终成果文件夹不存在
                cb => {
                    if (req.files && req.files.length > 0) {
                        fs.stat(destPath, (err, stat) => {
                            if (err && err.code === 'ENOENT') cb();
                            else cb(new Error(`Directory exists (should not have happened: ${err.code})`));
                        });
                    } else {
                        cb();
                    }
                },
                //如果提供的是url,则直接下载该zip文件
                cb => {
                    if (req.body.zipurl) {
                        let archive = "zipurl.zip";
                        upload.storage.getDestination(req, archive, (err, dstPath) => {
                            if (err) cb(err);
                            else{
                             //下载
                 const download = function(uri, filename, callback) {
                    request.head(uri, function(err, res, body) {
                        if (err) callback(err);
                        else request(uri).pipe(fs.createWriteStream(filename)).on('close', callback);
                    });
                  };
                 //getDestination函数返回的是当前任务的临时文件夹
                 //zip包最终会被下载到该文件夹
                              let archiveDestPath = path.join(dstPath, archive);
                              download(req.body.zipurl, archiveDestPath, cb);
                            }
                        });
                    } else {
                        cb();
                    }
                },
                //将所有文件从临时目录移到最终目录去
                cb => fs.mkdir(destPath, undefined, cb),
                cb => fs.mkdir(destGcpPath, undefined, cb),
                cb => mv(srcPath, destImagesPath, cb),
                cb => {
                    // 解压存在的zip文件
                    fs.readdir(destImagesPath, (err, entries) => {
                        if (err) cb(err);
                        else {
                            async.eachSeries(entries, (entry, cb) => {
                                if (/\.zip$/gi.test(entry)) {
                                    let filesCount = 0;
                                    fs.createReadStream(path.join(destImagesPath, entry)).pipe(unzip.Parse())
                                            .on('entry', function(entry) {
                                                if (entry.type === 'File') {
                                                    filesCount++;
                                                    entry.pipe(fs.createWriteStream(path.join(destImagesPath, path.basename(entry.path))));
                                                } else {
                                                    entry.autodrain();
                                                }
                                            })
                                            .on('close', () => {
                                                // 解压完成后检测是不是会文件过多
                                                if (config.maxImages && filesCount > config.maxImages) cb(`${filesCount} images uploaded, but this node can only process up to ${config.maxImages}.`);
                                                else cb();
                                            })
                                            .on('error', cb);
                                } else cb();
                            }, cb);
                        }
                    });
                },
                cb => {
                    // 寻找控制点描述文件,移动到正确的位置,并删除所有zip文件
                    // also remove any lingering zipurl.zip
                    fs.readdir(destImagesPath, (err, entries) => {
                        if (err) cb(err);
                        else {
                            async.eachSeries(entries, (entry, cb) => {
                                if (/\.txt$/gi.test(entry)) {
                                    mv(path.join(destImagesPath, entry), path.join(destGcpPath, entry), cb);
                                }else if (/\.zip$/gi.test(entry)){
                                    fs.unlink(path.join(destImagesPath, entry), cb);
                                } else cb();
                            }, cb);
                        }
                    });
                },
                // 创建任务并运行
                cb => {
                    new Task(req.id, req.body.name, (err, task) => {
                        if (err) cb(err);
                        else {
                            TaskManager.singleton().addNew(task);
                            res.json({ uuid: req.id });
                            cb();
                        }
                    }, req.body.options,
                    req.body.webhook,
                    req.body.skipPostProcessing === 'true');
                }
            ], err => {
                if (err) die(err.message);
            });
        }
    }
    

    至此,建立一个任务并运行需要调用4个接口.
    当然NodeODM也提供了合而为一的接口,简化调用:

    app.post('/task/new', authCheck, taskNew.assignUUID, taskNew.uploadImages, (req, res, next) => {
        req.body = req.body || {};
        if ((!req.files || req.files.length === 0) && !req.body.zipurl) req.error = "Need at least 1 file or a zip file url.";
        else if (config.maxImages && req.files && req.files.length > config.maxImages) req.error = `${req.files.length} images uploaded, but this node can only process up to ${config.maxImages}.`;
        next();
    }, taskNew.createTask);
    

    相关文章

      网友评论

        本文标题:How it works(10) NodeODM源码阅读(A)

        本文链接:https://www.haomeiwen.com/subject/jsrdmqtx.html