美文网首页Java
阿里oss文件分片上传

阿里oss文件分片上传

作者: suc浮生 | 来源:发表于2020-03-27 21:43 被阅读0次

OSS文件分片上传

依赖

<!-- https://mvnrepository.com/artifact/com.aliyun.oss/aliyun-sdk-oss -->
<dependency>
    <groupId>com.aliyun.oss</groupId>
    <artifactId>aliyun-sdk-oss</artifactId>
    <version>3.8.1</version>
</dependency>

基础参数dto

/**
 * @author WJL
 */
@Data
@Builder
public class OssParamDTO {

    private String endpoint;
    private String accessKeyId;
    private String accessKeySecret;
    private String bucketName;
    private String folder;
    /**
     * objectName = folder + fileName
     */
    private String objectName;

    /**
     * 上传线程
     */
    private Integer task;

    /**
     * 每个线程处理大小 分片大小
     */
    private Integer number;

}

具体上传方法

小文件上传
public static PutObjectResult uploadFile(OssParamDTO ossParamDTO, InputStream inputStream){
    // 创建OSSClient实例。
    OSS ossClient = new OSSClientBuilder().build(ossParamDTO.getEndpoint(), ossParamDTO.getAccessKeyId(), ossParamDTO.getAccessKeySecret());
    PutObjectResult putObjectResult = null;
    // 上传文件流。
    try {
        putObjectResult = ossClient.putObject(ossParamDTO.getBucketName(), ossParamDTO.getObjectName(), inputStream);
        //权限设置
        ossClient.setBucketAcl(ossParamDTO.getBucketName(), CannedAccessControlList.PublicRead);
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        // 关闭OSSClient。
        ossClient.shutdown();
    }
  return  putObjectResult;
}

大文件上传,分片oss自己处理
处理逻辑:前段轮训查询数据库某个字段,当该字段被回调接口更新时结束轮训,上传完成
public static void uploadBigFile(OssParamDTO ossParamDTO,String path, File file,Long fileId) throws Throwable {
    System.out.println("上传时间:"+System.currentTimeMillis());
    // Endpoint以杭州为例,其它Region请按实际情况填写。
    String endpoint = ossParamDTO.getEndpoint();
    // 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
    String accessKeyId = ossParamDTO.getAccessKeyId();
    String accessKeySecret = ossParamDTO.getAccessKeySecret();

    // 创建OSSClient实例。
    OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);

    ObjectMetadata meta = new ObjectMetadata();
    // 指定上传的内容类型。
    meta.setContentType("text/plain");

    // 通过UploadFileRequest设置多个参数。
    UploadFileRequest uploadFileRequest = new UploadFileRequest(ossParamDTO.getBucketName(),ossParamDTO.getObjectName());
    // 指定上传的本地文件。
    uploadFileRequest.setUploadFile(path);
    // 指定上传并发线程数,默认为1。
    uploadFileRequest.setTaskNum(ossParamDTO.getTask());
    // 指定上传的分片大小,范围为100KB~5GB,默认为文件大小/10000。
    uploadFileRequest.setPartSize(ossParamDTO.getNumber() * 1024 * 1024);

    uploadFileRequest.setObjectMetadata(meta);
    // 设置上传成功回调,参数为Callback类型。
    Callback callback = new Callback();
    callback.setCalbackBodyType(Callback.CalbackBodyType.URL);
    //回调参数 --- 同同步到数据库
    callback.setCallbackBody("fileId="+fileId+"&fileName=${object}&uploadStatus=1");
    //回调接口(自己服务器接口,可供外网访问)
    callback.setCallbackUrl("http://3m8wv2.natappfree.cc/web/common/callBack");
    uploadFileRequest.setCallback(callback);

    // 断点续传上传。
    ossClient.uploadFile(uploadFileRequest);
    //权限设置
    ossClient.setBucketAcl(ossParamDTO.getBucketName(), CannedAccessControlList.PublicRead);
    // 关闭OSSClient。
    ossClient.shutdown();
}
大文件本地分片,多线程执行分片上传,再合并碎片
分片上传代码
PartETag getUploadPartETag(String objectName, String bucketName, String uploadId,
                           InputStream instream, Long curPartSize,Integer partNum,
                           OSS ossClient, CountDownLatch countDownLatch){
    long before = System.currentTimeMillis();
    UploadPartRequest uploadPartRequest = null;
    try {
        log.debug("分片文件上传线程: {}",Thread.currentThread().getName());
        uploadPartRequest = new UploadPartRequest();
        uploadPartRequest.setBucketName(bucketName);
        uploadPartRequest.setKey(objectName);
        uploadPartRequest.setUploadId(uploadId);
        uploadPartRequest.setInputStream(instream);
        // 设置分片大小。除了最后一个分片没有大小限制,其他的分片最小为100KB。
        uploadPartRequest.setPartSize(curPartSize);
        // 设置分片号。每一个上传的分片都有一个分片号,取值范围是1~10000,如果超出这个范围,OSS将返回InvalidArgument的错误码。
        uploadPartRequest.setPartNumber(partNum);
        // 每个分片不需要按顺序上传,甚至可以在不同客户端上传,OSS会按照分片号排序组成完整的文件。
        UploadPartResult uploadPartResult = ossClient.uploadPart(uploadPartRequest);
        // 每次上传分片之后,OSS的返回结果会包含一个PartETag。PartETag将被保存到partETags中。
       log.debug("getPartETag  ::{}" ,uploadPartResult.getPartETag().getETag());
       return uploadPartResult.getPartETag();
    }finally {
        countDownLatch.countDown();
        log.debug("线程: {}  执行完毕, 等待线程数 :{}, 消耗时间: {}",
                Thread.currentThread().getName(),countDownLatch.getCount(),
                ((System.currentTimeMillis()-before)/1000)+"s");
    }
}

外部分片代码
@Qualifier("taskExecutor")
@Autowired
ThreadPoolTaskExecutor taskExecutor;
/**
* 上传
* @param ossParamDTO
* @param multipartFile
* @return
*/
public CompleteMultipartUploadResult uploadBigFileForProd(OssParamDTO ossParamDTO, MultipartFile multipartFile){
    Long before = System.currentTimeMillis();
    // Endpoint以杭州为例,其它Region请按实际情况填写。
    String endpoint = ossParamDTO.getEndpoint();
    // 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
    String accessKeyId = ossParamDTO.getAccessKeyId();
    String accessKeySecret = ossParamDTO.getAccessKeySecret();
    String bucketName = ossParamDTO.getBucketName();
    // <yourObjectName>表示上传文件到OSS时需要指定包含文件后缀在内的完整路径,例如abc/efg/123.jpg。
    String objectName = ossParamDTO.getObjectName();

    // 创建OSSClient实例。
    OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);

    // 创建InitiateMultipartUploadRequest对象。
    InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, objectName);

    // 初始化分片。
    InitiateMultipartUploadResult upresult = ossClient.initiateMultipartUpload(request);
    // 返回uploadId,它是分片上传事件的唯一标识,您可以根据这个ID来发起相关的操作,如取消分片上传、查询分片上传等。
    String uploadId = upresult.getUploadId();

    // partETags是PartETag的集合。PartETag由分片的ETag和分片号组成。
    List<PartETag> partETags =  new ArrayList<>();
    // 计算文件有多少个分片 15MB
    final long partSize = 2 * 1024 * 1024L;
    long fileLength = multipartFile.getSize();
    int partCount = (int) (fileLength / partSize);
    if (fileLength % partSize != 0) {
        partCount++;
    }
    // 遍历分片上传。
    log.info("分片数量  {}",partCount);
    List<Future<PartETag>> futureList = Collections.synchronizedList(new ArrayList());
    CountDownLatch countDownLatch = new CountDownLatch(partCount);
    for (int i = 0; i < partCount; i++) {
        long startPos = i * partSize;
        long curPartSize = (i + 1 == partCount) ? (fileLength - startPos) : partSize;
        InputStream instream = null;
        try {
            instream = multipartFile.getInputStream();
        }  catch (IOException e) {
            e.printStackTrace();
        }
        // 跳过已经上传的分片。
        try {
            instream.skip(startPos);
        } catch (IOException e) {
            e.printStackTrace();
        }
        int finalI = i;
        InputStream finalInstream = instream;
        Future<PartETag> partETagFuture = taskExecutor.submit(() ->
                fileServiceExtAsync.getUploadPartETag(objectName, bucketName, uploadId, finalInstream, curPartSize, finalI + 1, ossClient, countDownLatch));
        futureList.add(partETagFuture);
    }
    try {
        countDownLatch.await();
        for (Future<PartETag> tagFuture : futureList) {
            partETags.add(tagFuture.get());
        }
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    // 创建CompleteMultipartUploadRequest对象。
    List<PartETag> collect = partETags.stream().sorted(Comparator.comparing(PartETag::getPartNumber)).collect(Collectors.toList());
    // 在执行完成分片上传操作时,需要提供所有有效的partETags。OSS收到提交的partETags后,会逐一验证每个分片的有效性。当所有的数据分片验证通过后,OSS将把这些分片组合成一个完整的文件。
    log.debug("文件开始合并");
    CompleteMultipartUploadRequest completeMultipartUploadRequest =
            new CompleteMultipartUploadRequest(bucketName, objectName, uploadId, collect);

    // 如果需要在完成文件上传的同时设置文件访问权限,请参考以下示例代码。
    completeMultipartUploadRequest.setObjectACL(CannedAccessControlList.PublicRead);
    // 完成上传。
    CompleteMultipartUploadResult completeMultipartUploadResult = ossClient.completeMultipartUpload(completeMultipartUploadRequest);

    // 关闭OSSClient。
    ossClient.shutdown();
    log.debug("消耗总时间:  {}",((System.currentTimeMillis()-before)/1000)+"s");
    return completeMultipartUploadResult;
}

相关文章

网友评论

    本文标题:阿里oss文件分片上传

    本文链接:https://www.haomeiwen.com/subject/pzaruhtx.html