项目中涉及视频文件上传,相关服务端给的主要接口是 创建上传文件——分片上传文件流,一个大文件按照一定的大小分割每次上传这分割一部分,就需要多线程处理。
##
ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(count);
for (int i = 0; i < count; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
// do something
}
});
}
//指定位置分片文件流 (读取文件的指定范围)
//文件上传读流方式有 写入其他参数的multipart/form-data,也有只要文件流的 application/octet-stream,此处用application/octet-stream,只读取文件流。
ByteArrayOutputStream dos = null;
try {
dos = new ByteArrayOutputStream();
File file = new File(filePath);
RandomAccessFile raf = new RandomAccessFile(file, "r");
long fileLength = file.length();
long startPosition = 0L;
int blockSize = videoResult.blocksize;
int total_block = videoResult.total_block;
int current_block = videoResult.current_block;
startPosition = (current_block - 1 ) * blockSize;
long endPosition = (startPosition + blockSize) >= fileLength ? fileLength : (startPosition + blockSize);
raf.seek(startPosition);
blockSize+"=="+total_block+"=="+raf.getFilePointer());
int read = 0;
byte[] bytes = new byte[1024];
while (raf.getFilePointer() < endPosition && (read = raf.read(bytes)) != -1){
dos.write(bytes,0,read);
}
return dos.toByteArray();
}catch (Exception e){
return null;
}finally {
if(dos != null){
try {
dos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
经过上传测试,小文件正常,大点的文件,由于开启多个线程,执行的顺序随机性的在某个位置发生错乱,即开启多线程后,线程的执行是无法控制的,服务端接收的分片是无序的,就给合并文件造成了困难。当然理论上,这种按多线程同时执行,上传速度显然更快,合并文件也可以有其他方式正常完成。但是目前,本模块服务端设计如此,就是需要确保开启多线程,线程任务按自然顺序进行,按此需求模型,经过查找资料,引入Semaphore,指定Semaphore semaphore = new Semaphore(1); 单通道执行。一个线程执行完开放通道执行下一个
executors = new ScheduledThreadPoolExecutor(remainCount);
Semaphore[] syncObjects = new Semaphore[remainCount];
for(int i = 0; i < remainCount; i++){
syncObjects[i] = new Semaphore(1);
try {
if (i != remainCount - 1){
syncObjects[i].acquire();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//保证 分片自然顺序 分片从1开始
//此处
int startBlock = 1;
//循环中 total_block 就要 加 1;
total_block += 1;
for (int i = startBlock; i < total_block; i++) {
//syncObjects 索引从0开始,此处就要再减去 total_block加上的 1
final Semaphore lastSemphore = i == startBlock ? syncObjects[total_block - 1 - 1] : syncObjects[i - 1 - startBlock];
final Semaphore currentSemphore = syncObjects[i - startBlock];
VideoResult.Result result = new VideoResult.Result(subResult);
result.current_block = i;
final VideoResult.Result paramResult = result;
executors.execute(new Runnable() {
@Override
public void run() {
try {
lastSemphore.acquire();
uploadFilePartRequest(paramResult);
currentSemphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
这个lastSemphore 和currentSemphore 就是每次执行的上一个信号通道,和当前的通道。
线程执行前都是默认锁定
初始默认为除了最后一个其他都锁定占用通道.png
执行开始,上一个锁定,当前的执行完释放。
第一次为最后一个锁定,第一次执行完第一个再释放。
这样就保证了线程池循环开启线程执行时能够保证自然顺序,即和循环的顺序一致。
PS:查资料后这样使用是可以实现需求效果,然而其根本原理,和加锁的区别,目前属实不是十分清楚。2019.12.13 11:50
网友评论