任务运行时
上一节的最后,通过"TaskManager.singleton().addNew(task);"方法,新建的任务被加入进了任务管理器,自此,任务的一切都交由任务管理器来调控了.
由上方的代码可以看出,TaskManager是一个单例实例.原因是保证当Taskmanager(下面简称任务管理器)在不同模块被引用时,都指向唯一的任务管理器实例.
//在整个应用启动时,执行TaskManager.initialize()
//在其他任何时候,执行TaskManager.singleton()即可获取实例
module.exports = {
singleton: function(){ return taskManager; },
initialize: function(cb){
taskManager = new TaskManager(cb);
}
};
任务最终运行
任务加入任务管理器时发生了什么呢:
addNew(task){
//将任务加入任务队列
this.tasks[task.uuid] = task;
//调度函数
this.processNextTask();
}
可以看出,任务没有被直接执行,只是进入了一个队列进行排队.
最终何时运行,完全依靠任务管理器的调度:
//寻找下一个可以执行的任务
//触发时机:任务被添加时,任务完成时,任务被手动取消/移除/重启时
processNextTask(){
//如果当前运行的任务数量小于最大任务数
if (this.runningQueue.length < config.parallelQueueProcessing){
//在任务列表中寻找第一个状态是"排队中"的任务
let task = this.findNextTaskToProcess();
if (task){
//将该任务添加到正在运行任务列表
this.addToRunningQueue(task);
//运行任务
task.start(() => {
//当任务结束时调用webhooks
task.callWebhooks();
//将本任务从正在运行任务列表移除
this.removeFromRunningQueue(task);
//寻找下一个排队中的任务
this.processNextTask();
});
//如果本节点可允许同时允许的任务较多,就再检查一遍
if (this.runningQueue.length < config.parallelQueueProcessing) this.processNextTask();
}
}
}
//将本任务从正在运行任务列表移除
removeFromRunningQueue(task){
//将自己从列表里排除
this.runningQueue = this.runningQueue.filter(t => t !== task);
}
任务的取消/移除/重启
任务运行后的操作相对就简单多了,基本上调用task的对于方法:
cancel(uuid, cb){
//获取指定的任务
let task = this.find(uuid, cb);
if (task){
if (!task.isCanceled()){
//调用任务的取消方法
task.cancel(err => {
//从正在运行列表移除
this.removeFromRunningQueue(task);
//寻找下一个可执行任务
this.processNextTask();
cb(err);
});
}else{
cb(null);
}
}
}
// 移除任务之前自动取消任务
remove(uuid, cb){
this.cancel(uuid, err => {
if (!err){
let task = this.find(uuid, cb);
if (task){
//删除目录
task.cleanup(err => {
if (!err){
//从列表中移除该任务
delete(this.tasks[uuid]);
//寻找下一个可运行任务
this.processNextTask();
cb(null);
}else cb(err);
});
}
}else cb(err);
});
}
// 重新启动状态为已取消或已失败的任务
restart(uuid, options, cb){
let task = this.find(uuid, cb);
if (task){
//调用任务的方法,将任务切换为排队中状态
task.restart(options, err => {
//寻找下一个可运行任务
if (!err) this.processNextTask();
cb(err);
});
}
}
任务本身
以上一直在讨论如何操作任务之外的环境及如何调用任务的各种功能,现在将视线回归到任务,所有环境的准备都是为了任务,所有命令的执行者也是任务.
其实对于一个具体任务,与整体类似,也分如下几个步骤:
- 准备照片和控制点文件等运行前准备
- 运行任务
- 响应诸如停止,移除,反馈进度等命令
运行前准备
运行前做的工作很简单:
- 初始化任务描述信息,如创建时间
- 获取所有图片的路径
- 获取所有控制点描述文件的路径.
运行任务
整个程序的核心就在这里了.而任务运行又是由若干子任务的运行构成的:
- 调用ODM命令处理图片
- 生成完成后调用后处理脚本
- 将最终成果打包到zip中
先看看这些子任务是如何实现的:
调用ODM命令处理图片
ODM本身并不包含在NodeODM项目中,而是安装为命令行工具,因此,这一步其实是调用命令行的ODM命令进行处理:
module.exports = {
run: function(options, projectName, done, outputReceived){
//待运行的脚本的路径
const command = path.join(config.odm_path, "run.sh"),
params = [];
//拼接参数
for (var name in options){
let value = options[name];
params.push("--" + name);
if (typeof value !== 'boolean'){
params.push(value);
}
}
params.push(projectName);
//调用子进程运行ODM脚本
let childProcess = spawn(command, params, {cwd: config.odm_path});
childProcess
.on('exit', (code, signal) => done(null, code, signal))
.on('error', done);
//将子进程的输出指回主进程
childProcess.stdout.on('data', chunk => outputReceived(chunk.toString()));
childProcess.stderr.on('data', chunk => outputReceived(chunk.toString()));
return childProcess;
}
}
调用后处理脚本
与执行ODM脚本相比还更简单一些:
function makeRunner(command, args){
return function(options, done, outputReceived){
let commandArgs = args;
if (typeof commandArgs === 'function') commandArgs = commandArgs(options);
// 运行命令,绑定输入输出
let childProcess = spawn(command, commandArgs);
childProcess
.on('exit', (code, signal) => done(null, code, signal))
.on('error', done);
childProcess.stdout.on('data', chunk => outputReceived(chunk.toString()));
childProcess.stderr.on('data', chunk => outputReceived(chunk.toString()));
return childProcess;
};
}
module.exports = {
runPostProcessingScript: makeRunner(path.join(__dirname, "..", "scripts", "postprocess.sh"), options => [options.projectFolderPath])
};
将最终成果打压缩包
const createZipArchive = (outputFilename, files) => {
return (done) => {
//输出进度
this.output.push(`Compressing ${outputFilename}\n`);
//创建写入流
let output = fs.createWriteStream(this.getAssetsArchivePath(outputFilename));
let archive = archiver.create('zip', {
zlib: { level: 1 } // 1是最快速度,因为这些图片的压缩率已经很高了,没有更多的压缩空间了
});
//压缩完成就回调
archive.on('finish', () => {
done();
});
//错误时完成
archive.on('error', err => {
done(err);
});
//通过管道将zip流和输出流绑定
archive.pipe(output);
let globs = [];
//输出路径
const sourcePath = this.getProjectFolderPath();
//遍历传入的所有文件路径
files.forEach(file => {
let filePath = path.join(sourcePath, file);
// 跳过不存在的文件
if (!fs.existsSync(filePath)) return;
//匹配含有"*"的路径,判断是不是globs表达式
//globs模块可以像ls命令一样查利用*.jpg这种表达式查找文件
let isGlob = /\*/.test(file)
//判断是不是文件夹
let isDirectory = !isGlob && fs.lstatSync(filePath).isDirectory();
//直接压缩文件夹
if (isDirectory){
archive.directory(filePath, file);
}else if (isGlob){
globs.push(filePath);
}else{
//直接压缩文件
archive.file(filePath, {name: file});
}
});
// 如果有globs表达式,就最后处理
if (globs.length !== 0){
let pending = globs.length;
//遍历每一种globs表达式
globs.forEach(pattern => {
glob(pattern, (err, files) => {
if (err) done(err);
else{
files.forEach(file => {
if (fs.lstatSync(file).isFile()){
archive.file(file, {name: path.basename(file)});
}else{
logger.debug(`Could not add ${file} from glob`);
}
});
//如果处理结束就完成压缩
if (--pending === 0){
archive.finalize();
}
}
});
});
}else{
archive.finalize();
}
};
};
看完了子任务的实现,再来看看这些任务是如何调度的:
function start(done) {
const finished = err => {
this.stopTrackingProcessingTime();
done(err);
};
//后处理任务
const postProcess = () => {
//执行后处理脚本
const runPostProcessingScript = () => {
return (done) => {
//添加到runningProcesses数组中
//runningProcesses存储了所有子进程,可以方便统一管理
this.runningProcesses.push(
//运行脚本,实现诸如切图等对生成成功进行的操作
processRunner.runPostProcessingScript({
projectFolderPath: this.getProjectFolderPath()
}, (err, code, signal) => {
if (err) done(err);
else {
if (code === 0) done();
else done(new Error(`Process exited with code ${code}`));
}
}, output => {
//将运行日志导入output数组
this.output.push(output);
})
);
};
};
// 任务文件夹下所有的子文件夹名
let allPaths = ['odm_orthophoto/odm_orthophoto.tif', 'odm_orthophoto/odm_orthophoto.mbtiles',
'odm_georeferencing', 'odm_texturing',
'odm_dem/dsm.tif', 'odm_dem/dtm.tif', 'dsm_tiles', 'dtm_tiles',
'orthophoto_tiles', 'potree_pointcloud', 'images.json'
];
//记录所有要进行的后处理任务
let tasks = [];
//先运行后处理脚本,再执行文件压缩
if (!this.skipPostProcessing) tasks.push(runPostProcessingScript());
tasks.push(createZipArchive('all.zip', allPaths));
async.series(tasks, (err) => {
if (!err) {
this.setStatus(statusCodes.COMPLETED);
finished();
} else {
this.setStatus(statusCodes.FAILED);
finished(err);
}
});
};
if (this.status.code === statusCodes.QUEUED) {
//更新任务状态与已运行时长
this.startTrackingProcessingTime();
this.setStatus(statusCodes.RUNNING);
//构建任务参数
let runnerOptions = this.options.reduce((result, opt) => {
result[opt.name] = opt.value;
return result;
}, {});
runnerOptions["project-path"] = fs.realpathSync(Directories.data);
if (this.gcpFiles.length > 0) {
runnerOptions.gcp = fs.realpathSync(path.join(this.getGcpFolderPath(), this.gcpFiles[0]));
}
//首先运行的是ODM脚本
this.runningProcesses.push(odmRunner.run(runnerOptions, this.uuid, (err, code, signal) => {
if (err) {
this.setStatus(statusCodes.FAILED, {
errorMessage: `Could not start process (${err.message})`
});
finished(err);
} else {
//如果ODM脚本正常结束,则进行后处理
if (this.status.code !== statusCodes.CANCELED) {
if (code === 0) {
postProcess();
} else {
this.setStatus(statusCodes.FAILED, {
errorMessage: `Process exited with code ${code}`
});
finished();
}
} else {
finished();
}
}
}, output => {
//格式化输出
output = output.replace(/\x1b\[[0-9;]*m/g, "");
output.trim().split('\n').forEach(line => {
this.output.push(line.trim());
});
}));
return true;
} else {
return false;
}
}
响应命令
取消任务
取消任务是通过杀进程的方式,杀死正在运行的ODM进程,或后处理进程,当程序正处在最后的压缩阶段时,因为不是采用子进程的方式,取消是无效的.
cancel(cb){
if (this.status.code !== statusCodes.CANCELED){
let wasRunning = this.status.code === statusCodes.RUNNING;
this.setStatus(statusCodes.CANCELED);
if (wasRunning) {
//杀死所有正在运行的子进程
this.runningProcesses.forEach(proc => {
if (proc) kill(proc.pid);
});
this.runningProcesses = [];
}
//停止更新时间
this.stopTrackingProcessingTime(true);
cb(null);
}else{
cb(new Error("Task already cancelled"));
}
}
调用webhooks
因为ODM任务一般耗时很长,因此在任务结束时,可以通过网络钩子将结束信息发送到指定端口,提醒任务已经结束
function callWebhooks() {
const hooks = [this.webhook, config.webhook];
//获取任务的图片描述信息
this.readImagesDatabase((err, images) => {
if (!images) images = [];
//获取任务的基本信息和状态
let json = this.getInfo();
json.images = images;
//遍历所有的webhook
hooks.forEach(hook => {
if (hook && hook.length > 3) {
//设定最多进行5次提醒调用
const notifyCallback = (attempt) => {
//失败超过5次就不再继续尝试
if (attempt > 5) {
logger.warn(`Webhook invokation failed, will not retry: ${hook}`);
return;
}
//发起请求,将任务信息传递给该webhook
request.post(hook, {
json
},
(error, response) => {
if (error || response.statusCode != 200) {
logger.warn(`Webhook invokation failed, will retry in a bit: ${hook}`);
//出错就隔一段时间再请求一次
setTimeout(() => {
notifyCallback(attempt + 1);
}, attempt * 5000);
} else {
logger.debug(`Webhook invoked: ${hook}`);
}
});
};
notifyCallback(0);
}
});
});
}
结语
NodeODM并不是个复杂的系统,但是关于任务的细节比较完善.尽管如此,在实际使用webODM调用NodeODM时,还是会遇到各种无法响应命令的问题.猜测原因在于通过子进程调用ODM脚本的方式是不能保证绝对的应答响应,NodeODM的正常运行不能保证ODM就能正常运行,甚至两者都正常运行也难以保证两者的通信链路是否不会出现中断.
因为ODM本身是Python的库,假设ODM本身就封装了NodeODM这些对外的接口,webODM调用起来或许会更加稳定.
网友评论