附件分片下载、断点续传
场景引入
在最近接触的一个项目中碰到这样的场景一个加密过的文件存储在FastDFS中,在业务处理过程中需要将对应的附件下载下来并解密之后做相应的业务处理。
原有的处理方式是单线程模式下载并做相应的业务处理,当文件比较大时例如10G+
的文件此时就会出现性能瓶颈。故此引入分片下载
处理方案,开启多个线程同步下载附件,提高下载效率。
以下示例是是剥离业务功能后写的一个附件分片下载、断点续传demo
涵盖了核心实现原理
附件分片下载、断点续传 实现原理
核心实现原理
-
获取附件信息:附件大小、是否需要分片处理、详细分片信息、
附件MD5
-
更具附件ID获取附件信息
-
通过分片阈值判断是否需要进行分片处理,需要分片处理则获取详细的分片信息
-
计算
附件MD5
或每个分片的MD5
,并缓存起来(相同附件ID
直接从缓存中获取MD5
)
-
-
根据步骤
1
获取的信息请求附件下载接口下载附件-
判断磁盘空间是否充裕
-
根据分片信息调用下载接口下载附件,使用线程池执行下载合理高效使用计算机处理能力
-
比对每个分片的
MD5
,所有分片下载成功后合并附件(可每个分片下载成功直接进入合并附件的执行步骤
)。
-
-
分片下载完成后合并附件,并通过
附件MD5
校验附件下载的一致性 -
相应的业务处理
工业级代码考虑点
-
duild once ,Run everywhere
为适应不同服务器公共的性能参数做到可配置:下载缓冲区大小、分片阈值、每个分片大小、下载存储临时目录、线程池大小、线程池队列大小 -
下载前做磁盘空间预判
-
保证每个分片下载的正确性(
校验分片MD5
) 并保证最终合并附件的正确性 -
利用
HTTP 206 Partial Content
实现断点续传
代码实现
获取附件信息
/**
* 获取附件信息
*
* @return 附件详细信息
*/
@RequestMapping("/getFileDetailInfo")
public FileDetailInfo getFileDetailInfo(String fileId) {
FileDetailInfo fileDetailInfo = new FileDetailInfo();
//附件id 获取附件信息
String path = getFilePath(fileId);
File file = new File(path);
if (file.exists() && file.isFile()) {
fileDetailInfo.setSize(file.length());
fileDetailInfo.setFilePath(path);
fileDetailInfo.setFileName(file.getName());
fileDetailInfo.setFileMd5(FileUtils.getFileMd5(path));
if (file.length() > fileOperationProperties.getMultipartSizeLimit()) {
fileDetailInfo.setMultipart(true);
//超过分块阈值限制 进行附件分块
long alreadyPartLength = 0;
int part = 0;
while (alreadyPartLength<file.length()) {
//起始分块位置
long off = alreadyPartLength;
//已分块的长度
alreadyPartLength = (alreadyPartLength + fileOperationProperties.getMultipartSize()) > file.length() ?
file.length() : alreadyPartLength + fileOperationProperties.getMultipartSize();
MultiPartFileInfo multiPartFileInfo = new MultiPartFileInfo();
multiPartFileInfo.setFileName(file.getName()+part);
multiPartFileInfo.setLen(alreadyPartLength - off);
//分块的MD5
multiPartFileInfo.setFileMd5(FileUtils.getFileMd5(file,multiPartFileInfo.getFileName() ,
off ,multiPartFileInfo.getLen() ));
multiPartFileInfo.setFilePath(fileDetailInfo.getFilePath());
part += 1;
multiPartFileInfo.setOff(off);
fileDetailInfo.addMultiPart(multiPartFileInfo);
}
}else{
fileDetailInfo.setMultipart(false);
}
}else {
throw new RuntimeException("操作异常!非文件或文件不存在");
}
return fileDetailInfo;
}
下载功能 包含断点续传
/**
* 附件下载
*/
@RequestMapping("/downloadFile")
public void downloadFile(@RequestBody MultiPartFileInfo multiPartFileInfo , HttpServletResponse response , HttpServletRequest request) {
//FIXME 优化代码
File file = new File(multiPartFileInfo.getFilePath());
if (file.exists() && file.isFile()) {
OutputStream out = null;
RandomAccessFile in = null;
//下载起始位置
long off = 0;
int downloadSize = 0;
try {
response.setContentType("application/octet-stream");
response.setHeader("Content-disposition", String.format("attachment; filename=\"%s\"",
new String(multiPartFileInfo.getFileName().getBytes("UTF-8"), "ISO8859-1")));
response.setHeader("Accept-Ranges", "bytes");
if (request.getHeader("Range") == null) {
response.setHeader("Content-Length", String.valueOf(multiPartFileInfo.getLen()));
}else {
//解析断点续传
String range = request.getHeader("Range");
String[] bytes = range.replaceAll("bytes", "").split("-");
off = Long.parseLong(bytes[0]);
long end = 0;
if (bytes.length == 2) {
end = Long.parseLong(bytes[1]);
}
int length = 0;
if (end != 0 && end>off) {
length = Math.toIntExact(end - off);
}else{
length = Math.toIntExact(multiPartFileInfo.getLen() - off);
}
response.setHeader("Content-Length", String.valueOf(length));
downloadSize = length;
}
in = new RandomAccessFile(file,"rw");
out = response.getOutputStream();
if (off == 0) {
off = multiPartFileInfo.getOff();
}
if (downloadSize == 0) {
downloadSize = Math.toIntExact(multiPartFileInfo.getLen());
}
byte[] bytes = new byte[fileOperationProperties.getReadBufLenSize()];
int length = 0;
//设置下载起始位置
if (multiPartFileInfo.getOff() > 0) {
in.seek(off);
}
//预防读取超出分块范围大小
long readContentLen = 0;
if ((readContentLen + fileOperationProperties.getReadBufLenSize()) > downloadSize) {
bytes = new byte[Math.toIntExact(multiPartFileInfo.getLen() - readContentLen)];
}
while ((length = in.read(bytes)) !=-1 ) {
out.write(bytes,0,length);
readContentLen += length;
}
out.flush();
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (in != null) {
in.close();
}
if (out != null) {
out.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Junit下载测试用例
@Test
public void testFileUploadNotPart() {
try {
//Step1 查询分片信息
MvcResult mvcResult = mockMvc.perform(MockMvcRequestBuilders.post("/fileOperation/getFileDetailInfo").param("fileId", "1"))
.andExpect(MockMvcResultMatchers.status().isOk())
.andDo(MockMvcResultHandlers.print())
.andReturn();
String body = mvcResult.getResponse().getContentAsString();
FileDetailInfo fileDetailInfo = new JSONObject().parseObject(body, FileDetailInfo.class);
//创建临时存储文件夹
File directFile = new File(fileOperationProperties.getTempDirect());
if (!directFile.exists()) {
directFile.mkdirs();
}
if (FileUtils.enoughFreeSpace(fileOperationProperties.getTempDirect(), (long) (fileDetailInfo.getSize() * 1.3))) {
//Step2 下载附件
if (fileDetailInfo.isMultipart()) {
//Step2.1 分片下载
int nThreads = fileOperationProperties.getNThreads();
int threadPoolQueueCapacity = fileOperationProperties.getThreadPoolQueueCapacity();
ExecutorService executorService = new ThreadPoolExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(threadPoolQueueCapacity));
//Step2.1.1 线程池中下载
//FIXME 优化:1、获取的分片信息是个有序集合,按照一定的顺序下载。-->及时处理已下载完成的分片,尽可能合理利用存储空间
//FIXME 优化:2、下载完成一个分片 合并处理一个分片 不适用阻塞
Queue<Future<FileUploadResultModel>> futureQueue = new ConcurrentLinkedQueue<Future<FileUploadResultModel>>();
for (MultiPartFileInfo multiPartFileInfo : fileDetailInfo.getMultiPartFileInfos()) {
Future<FileUploadResultModel> futreTask = executorService.submit(new FileJob(restTemplate,
multiPartFileInfo, fileOperationProperties));
futureQueue.add(futreTask);
}
int sizeOfSuccessPartFile = 0;
for (Future<FileUploadResultModel> resultModelFuture : futureQueue) {
FileUploadResultModel resultModel = resultModelFuture.get();
if (resultModel.isSuccess()) {
sizeOfSuccessPartFile += 1;
} else {
//下载失败处理--记录下载失败原因/重试下载
}
}
if (sizeOfSuccessPartFile == fileDetailInfo.getMultiPartFileInfos().size()) {
//Step2.1.2 下载成功 合并附件
String filePath = fileOperationProperties.getTempDirect() + File.separator + fileDetailInfo.getFileName();
//预创建与源文件相同大小的文件
File file = new File(filePath);
if (file.exists() && file.isFile()) {
file.delete();
file.createNewFile();
} else {
file.createNewFile();
}
//FIXME 此处使用多线程合并文件,提高合并处理效率
RandomAccessFile rFile = new RandomAccessFile(file, "rw");
rFile.setLength(fileDetailInfo.getSize());
for (Future<FileUploadResultModel> resultModelFuture : futureQueue) {
FileUploadResultModel fileUploadResultModel = resultModelFuture.get();
MultiPartFileInfo multiPartFileInfo = fileUploadResultModel.getMultiPartFileInfo();
//设置写入起始位置
rFile.seek(multiPartFileInfo.getOff());
byte[] bytes = new byte[fileOperationProperties.getReadBufLenSize()];
int length = 0;
File tempFile = new File(fileUploadResultModel.getLocalFilePath());
InputStream TempFileInputStream = new FileInputStream(tempFile);
while ((length = TempFileInputStream.read(bytes)) != -1) {
rFile.write(bytes, 0, length);
}
TempFileInputStream.close();
tempFile.delete();
}
//Step2.1.3 校验附件
if (FileUtils.checkFile(filePath, fileDetailInfo.getFileMd5(), fileDetailInfo.getSize())) {
log.info("附件下载成功!附件本地目录 {}", filePath);
}
}
} else {
//step2.2 整个附件下载
MultiPartFileInfo multiPartFileInfo = new MultiPartFileInfo();
multiPartFileInfo.setFilePath(fileDetailInfo.getFilePath());
multiPartFileInfo.setOff(0);
multiPartFileInfo.setFileName(fileDetailInfo.getFileName());
multiPartFileInfo.setFileMd5(fileDetailInfo.getFileMd5());
multiPartFileInfo.setLen(fileDetailInfo.getSize());
//下载附件
FileJob fileJob = new FileJob(restTemplate,
multiPartFileInfo, fileOperationProperties);
FileUploadResultModel resultModel = fileJob.uploadFile();
//step3 校验附件MD5
if (FileUtils.checkFile(resultModel.getLocalFilePath(), fileDetailInfo.getFileMd5(), fileDetailInfo.getSize())) {
log.info("附件下载成功!附件本地目录 {}", resultModel.getLocalFilePath());
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
结尾
以上是整个分片下载、断点续传的实现原理及其实现。
在上诉实现中还有可优化的点
-
获取的分片信息时使用有序集合,按照一定的顺序下载。-->及时处理已下载完成的分片,尽可能合理利用存储空间
-
下载完成一个分片 合并处理一个分片 不使用阻塞,减少磁盘空间的占用
-
可使用多线程处理附件合并
网友评论