背景:
前端文件上传是非常普遍的功能,当需要上传大文件时会有以下问题。
1.前后端上传时间限制,一次性传输大小限制。
2.网络抖动等,失败后需要重新上传。
3.http1.1版本, TCP只有传送一个请求
4.无进度条,用户体验极差
主要步骤:
前端:
加载文件 ➡️ 分片 ➡️ 上传
node.js:
解析文件 ➡️ 存放文件碎片 ➡️ 合并文件
比如重庆市向上海市订购了一批高铁列车,如果一次性运过来不太现场,没有那么大的船,还有就是一次性运过来,如果路上出事故,需要重新发送一批了,损失严重。
所以我们准备分批发送。
1.浏览器加载文件
<input id="file" type="file" onchange="uploadFile()"id="upload" />
<input type="button" id="upload" value="文件上传" class="btn btn-warning" onclick="handleUpload()" />
这一步主要是把文件读取到内存里。
document.getElementById('file').files 是 FileList类型。
document.getElementById('file').files[0] 是File类型的包装器。
File FileList FileReader关系:
FileReader只能读取 File或者 blob对象,File对象是FileList
的子集,constructor == Blob, 有slice方法。
2.上传文件方式选择
文件上传采用 formData形式,而不是json。原因json传参需要JSON. stringify序列化
比如一下代码:
var xhr = new XMLHttpRequest();
xhr.open('post','http://localhost:3000/ajaxpost');
xhr.setReuqestHeader('Content-Type','application/json');
var params = JSON.stringify({
city: '重庆',
spcial: '山城'
})
xhr.send(params);
xhr.onload = function () {
console.log(xhr.responseText);
}
在序列化过程中,会抹掉一些比如 function File blob的对象,所以采用formData形式进行文件上传。
3.上传
3.1直接上传
- Promise 封装 ajax
const postAjax = (url,fd) => {
const xhr = new XMLHttpRequest();
return new Promise((resolve, reject) => {
xhr.open('POST', url, true);
xhr.onreadystatechange = function() {
if (xhr.readyState == 4 && xhr.status == 200) {
console.log(xhr.responseText, "responseText" )
resolve(xhr.responseText)
}
};
xhr.send(fd);
})
}
const url = "http://127.0.0.1:1000/file/uploading"
function uploadFile() {
const file = document.getElementById('file').files[0];
blockUpload(file)
}
const blockUpload = (file) => {
const fd = new FormData();
fd.append("file", file);
fd.append("fileName", file.name)
postAjax(url, fd);
}
3.2 分片上传
File对象可以使用,slice + File.size,对文件进行切割,切割后的chunk实际上是浏览器对象Blob。
const url = "http://127.0.0.1:1000/file/uploading"
const mergrUrl = "http://127.0.0.1:1000/file/mergrChunk"
const handleUpload = () => {
$("#file").click();
}
function uploadFile() {
const file = document.getElementById('file').files[0];
chunkedUpload(file)
}
- 分片
const chunkedUpload = async (file) => {
const chunkSize = 1024;
for (let start = 0; start <= file.size; start += chunkSize) {
const chunk = file.slice(start, start + chunkSize); // 分片 blob对象
const fd = new FormData();
fd.append("chunk", chunk);
fd.append("hash", start);
fd.append("fileName", file.name)
// 上传 利用async实现,同步请求
let per = Math.floor(100 * start / file.size );
if ((file.size - start) < chunkSize) {
per = 100;
}
await postAjax(url, fd).then(res => {
$('#bar').css({'width': per + "%",});
$('#bar').html(per + '%');
})
}
此时我们会等待一条船到达重庆,再让下一条船出发,河里同时只有一条船通行,就是说分片请求会等待上一个完成。
3.3 并发上传
为了利用浏览器的并发能力,把请求分批发送,每次并发11个,node.js同一个IP最多可以异步处理11个请求。
const chunkedUpload = async (file) => {
++ const chunkSize = 1024;
++ let postQueue = [];
++ const parallelNum = 11; //谷歌最大线程数量 大于11后提效不明显
for (let start = 0; start <= file.size; start += chunkSize) {
const chunk = file.slice(start, start + chunkSize); // 分片 blob对象
const fd = new FormData();
fd.append("chunk", chunk);
+ fd.append("hash", start); //node.js 接受时做为文件名
fd.append("fileName", file.name)
+ let per = Math.floor(100 * start / file.size );
+ if ((file.size - start) < chunkSize) {
+ per = 100;
+ }
+ // 一个线程使用完,再发送另一个
- await postAjax(url, fd).then(res => {
- })
+ if (postQueue.length < parallelNum) {
+ postQueue.push(postAjax(url, fd))
+ }
+ if (postQueue.length >= parallelNum || per === 100) {
+ // 11个请求并发
+ await Promise.all(postQueue).then(res => {
$('#bar').css({'width': per + "%",});
$('#bar').html(per + '%');
+ postQueue = [];
+ }).catch(err => {
+ console.error(err)
+ })
+ }
+ }
};
此时,我们可以同时发出11条船,等这11条到达重庆,开始下一轮,重新发送11条船,这样就能缩短运输时间啦。
3.4 any版
...
if (postQueue.length < parallelNum) {
- postQueue.push(postAjax(url, fd))
+ postQueue.push({post: (postAjax(url, fd)), hash: start} )
}
let per = Math.floor(100 * start / file.size );
if ((file.size - start) < chunkSize) {
per = 100;
}
if (postQueue.length >= parallelNum || per === 100) {
// 维持一个请求队列,一个请求完成加入一个,不用等待上一轮完成
+ const postApiQueues = postQueue.map(item => item.post)
await Promise.any(postApiQueues).then(res => {
+ let hash = res.hash
+ const index = postQueue.find(item => item.hash = hash)
+ postQueue.splice(index, 1)
- postQueue = [];
$('#bar').css({'width': per + "%",});
$('#bar').html(per + '%');
if (per >= 100) {
postAjax(mergrUrl, fd).then(res => {
})
}
}).catch(err => {
console.error(err)
})
}
...
把以上代码Promise.all 改成 Promise.any
这样等任何一条船到达重启,我们就可以开始马上让一艘船发货。
4.文件接收
4.1 node.js 接收文件流
- app.js 接收文件流
const express = require("express");
const app = express();
app.use(express.static("public"));
const multiparty = require("multiparty");
const fs = require("fs-extra");
const path = require("path");
const UPLOAD_DIR = path.resolve(__dirname);
app.get("/", (req, res) => {
res.sendFile(`${__dirname}/index.html`);
});
let FILE_NAME = "";
let chunkDir = "";
app.post("/file/uploading", (req, res, next) => {
/* 生成multiparty对象,并配置上传目标路径 */
var form = new multiparty.Form();
form.parse(req, async (err, fields, files) => {
if(err) return;
const [chunk] = files.chunk;
const [hash] = fields.hash;
const [fileName] = fields.fileName;
FILE_NAME = fileName;
chunkDir = path.resolve(UPLOAD_DIR, "fileSteam/fchunkDir" + fileName);
if (!fs.existsSync(chunkDir)) {
await fs.mkdirs(chunkDir);
}
// 文件暂时放入 chunkDir文件夹中
await fs.move(chunk.path, `${chunkDir}/${hash}`);
res.writeHead(200, { "content-type": "text/plain;charset=utf-8" });
res.write("200");
res.end();
});
});
app.use(express.static("public")).listen(1000);
上面的app.js 解析文件,然后临时存放在 chunkDir+文件名的文件夹下
相当于把高铁车的所有零件放入一个独立的仓库,仓库的名字就是高铁的名字,比如复兴号。
4.2 node.js 合并文件流 生成文件
- app.js
app.post("/file/uploading", (req, res, next) => {
......
});
// 合并chunk
+ const stream = require("./writeStream");
+ app.post("/file/mergrChunk", async (req, res, next) => {
+ FILE_NAME = path.resolve(UPLOAD_DIR, "fileSteam/" + FILE_NAME);
+ console.log(FILE_NAME, "========================");
+ let dests = fs.readdirSync(chunkDir);
+ dests = dests.sort((a, b) => a - b);
+ await stream.WriteStreamsAsync(dests, FILE_NAME, chunkDir);
+ await fs.removeSync(chunkDir);
+ res.write("200");
+ res.end();
});
app.use(express.static("public")).listen(1000);
前端文件传送完成,向后端发送一个合并请求,合并前把文件排序一下,文件合并操作在writeStream.js中。
- writeStream.js
const fs = require("fs"); // 引入fs模块
const path = require("path");
/**
* @params dests 文件流
* @params FILE_NAME 生成的文件名
* @params chunkDir 文件路径
*/
const WriteStreamsAsync = async (dests, FILE_NAME, chunkDir) => {
let writeable = fs.createWriteStream(FILE_NAME);
for (let i = 0; i < dests.length; i++) {
await write(dests[i], writeable, chunkDir);
}
};
const write = (item, writeable, chunkDir) => {
return new Promise((resolve, reject) => {
let destPath = path.resolve(__dirname, chunkDir + '/' + item);
let readable = fs.createReadStream(destPath);
readable.pipe(writeable, { end: false });
readable.on("end", () => {
// 关闭流之前立即写入最后一个额外的数据块
resolve();
});
});
};
module.exports = { WriteStreamsAsync };
利用 fs. createReadStream fs. createWriteStream 文件流api合并文件切片,生成文件,大文件上传完成。
这一步相当于把高铁组装起来,复原了。
断点续传
断点续传可以,在文件中断后继续上次的传输节点,继续上传。
在网页刷新后,把上传的节点存储到localStorage中,下次上传从localStorage查找是否有这个文件的节点存在,如果有从这个节点上传,如果没有,重新上传。
const postAjax = (url,fd) => {
const xhr = new XMLHttpRequest();
return new Promise((resolve, reject) => {
xhr.open('POST', url, true);
xhr.onreadystatechange = function() {
if (xhr.readyState == 4 && xhr.status == 200) {
const res = JSON.parse(xhr.responseText)
+ if (res.hash) {
+ window.localStorage.setItem(fileName, res.hash);
+ }
resolve(res)
}
};
xhr.send(fd);
})
}
function uploadFile() {
const file = document.getElementById('file').files[0];
+ let fileName = window.localStorage.getItem('fileName');
+ const pointHash = window.localStorage.getItem(fileName) || 0;
chunkedUpload(file, +pointHash)
}
const chunkedUpload = async (file, pointHash) => {
const chunkSize = 1024 * 10;
let postQueue = [];
const parallelNum = 25; //谷歌最大线程数量 大于11后提效不明显,node.js在1s内最多异步处理11个请求
for (let start = pointHash; start <= file.size; start += chunkSize) {
const chunk = file.slice(start, start + chunkSize); // 分片 blob对象
const fd = new FormData();
fd.append("chunk", chunk);
fd.append("hash", start);
fd.append("fileName", file.name)
+ window.localStorage.setItem('fileName', file.name);
// 线程并发
if (postQueue.length < parallelNum) {
postQueue.push({post: (postAjax(url, fd)), hash: start} )
}
let per = Math.floor(100 * start / file.size );
if ((file.size - start) < chunkSize) {
per = 100;
}
if (postQueue.length >= parallelNum || per === 100) {
const postApiQueues = postQueue.map(item => item.post)
await Promise.any(postApiQueues).then(res => {
let hash = res.hash
const index = postQueue.find(item => item.hash = hash)
postQueue.splice(index, 1)
$('#bar').css({'width': per + "%",});
$('#bar').html(per + '%');
if (per >= 100) {
postAjax(mergrUrl, fd).then(res => {
+ let fileName = window.localStorage.getItem('fileName');
+ window.localStorage.removeItem(fileName);
+ window.localStorage.removeItem('fileName');
})
}
}).catch(err => {
console.error(err)
})
}
}
};
速度对比:
为了便于观测我们先把网络设置成fast3G
这样能保证带宽不会影响传输速度
1.promise.all并行版
并行上传时间: 32.95S
2.any版上传
any上传时间: 25.96S
3.await排队版
排队上传时间: 1.5min
4.直传版
上传时间: 18.88s
可以看出,在传送速度上。
直传版(18.88s) > any版(25.96s) > 并行版(32.95s) > 排队版(1.5min)
结论
1.TCP建立请求,关闭请求是非常费时间的。
2.并行请求速度是排队上传快很多,这个方式是可行的。
网友评论