前言
最近维护一个老项目中的微信公众号h5的新需求,项目是Node.js服务,Node层负责前端路由、api聚合以及用户信息校验等工作,项目较大,上一次维护已经是3年前,现在不方便重构整个项目,新需求也依赖Node层做路由和api管理,因此需要在原项目内做开发。
原项目的前端资源由fis3输出到指定目录,其中js、css、img资源会上传至cdn,由另一台服务器nginx代理(文件上传服务会由另一个Node服务处理),Node层只负责解析html的解析,以减少Node层业务的压力。新需求基于Vue开发,需要webpack打包,因此需要通过其他方法上传静态资源到nginx的服务器。
文件上传请求是一个异步操作,那么是否可以通过并发请求,加快上传的速度呢?新需求是多页面,资源有上百个文件,同时并发上百个请求会对服务器造成过大压力,我们需要一定的机制让请求排队,除此以外,当上传失败时,还需要一定的重试能力。面对这些问题,接下来研究一下如何实现上述的需求。
尝试与学习
这里我参考了一篇并发请求的实践,可以先了解一下原文:
在大佬的基础上做了一些简单的改造,增加了重试的功能,同时项目开发中也遇到了一些小问题,以下是具体的实现,有注释说明:
// 任务队列
class RequestQueue {
constructor(maxLimit = 5, retry = 2) {
// 最大并发量
this.maxLimit = maxLimit
// 重试次数
this.retry = retry
// blocking queue 若当前请求并发量已经超过maxLimit,则将请求延迟到下某个任务完成,再执行该队列任务
this.requestQueue = []
// 当前并发量数目
this.currentConcurrent = 0
// 说明1:
// 实际请求中,可能会异步的抛出多个error
// 任务重试过程中,当catch到 error且 重试已到上限,会执行 next() 执行下一个任务,
// 此时,如果有异常抛出前一个异步任务的,会无法捕获
// 因此通过全局时间捕获剩余的异步异常
process.on("unhandledRejection", function(e){
console.log(e);
})
}
async run(request) {
// 并发限制
if (this.currentConcurrent >= this.maxLimit) {
await this.startBlocking() // 等待执行,直到某个任务执行this.next()
}
// 队列+1
this.currentConcurrent++
// 设置队列中同一个任务尝试次数
for (let retryCount = this.retry; retryCount > 0; retryCount--) {
let done = false
console.log('[ retryCount ]:' + retryCount)
try {
// 这里与大佬的方法有不同,这里需要传入一个包装好请求的Promise实例,如有需要也可以用pify将请求转成promise
const result = await request()
// 执行成功则结束尝试
done = true
return Promise.resolve(result)
// 如果有错误,会被捕获,不会执行resolve
} catch (error) {
console.log('[ request error ] - ' + error)
// 最后一次重试失败时停止重试,返回报错
if (retryCount === 1) {
done = true
return Promise.reject(error) // 错误只会抛出一次
}
} finally {
// 如果已经结束重试,执行请求队列的下一个任务
if (done) {
this.currentConcurrent--
this.next()
break;
}
}
}
}
next() {
if (this.requestQueue.length <= 0) return
const resolve = this.requestQueue.shift()
resolve() // 取出block promise 的resolve 执行
}
startBlocking() {
let _resolve
let promise2 = new Promise((resolve) => (_resolve = resolve))
this.requestQueue.push(_resolve)
return promise2 // 返回block promise 用于暂停队列的执行
}
}
使用方式:
const request = () => {
return new Promise((resolve, reject) => {
setTimeout(() => { resolve() }, 1000)
})
}
const instance = new RequestQueue()
const promises = []
for (let i = 0; i < 100; i++ ) {
promises.push(instance.run(request)
.catch(err => {
// 这里是否catch(err)取决于是否允许某个任务失败时,其他任务继续执行
console.log(err)
})
)
}
Promise.all(promises)
.catch(err => console.log(err))
// 如果前面的push过程中不catch,则一旦有任务抛出错误,剩余的任务不再执行
整个实现如上,与大佬的实践略有不同,仅供学习。实际生产中,更推荐使用开源社区成熟的库,async,这个库提供更全面的异步流控制,便于我们进行开发。
网友评论