美文网首页
Oss多线程分片方式上传文件

Oss多线程分片方式上传文件

作者: 艺术类架构师 | 来源:发表于2022-08-01 15:55 被阅读0次

    public class OssClientFactory {

    public static OSSClient createOssClient(String endpoint,
    String accessKeyId,
    String accessKeySecret,
    String bucket) {

      OSSClientBuilder ossClientBuilder=new OSSClientBuilder();
      // 创建OSSClient实例。
      OSSClient ossClient = (OSSClient) ossClientBuilder.build(endpoint, accessKeyId, accessKeySecret);
      ossClient.createBucket(bucket);
      return  ossClient;
    

    }
    }

    package com.jielu.aliyun.oss;

    import com.aliyun.oss.OSSClient;
    import com.aliyun.oss.model.*;
    import com.jielu.leetcode.NamedThreadFactory;

    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.*;

    /**

    • OSS Mul-Thread upload file
      */
      public class OssMultipleThreadUploadExecutor {

      private final OSSClient ossClient;

      public OssMultipleThreadUploadExecutor( OSSClient ossClient){
      this.ossClient=ossClient;
      }

      final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
      10,
      50,
      1000 * 60,
      TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<>(),
      new NamedThreadFactory("thread-oss-upload-file"),
      (r, executor) -> r.run()
      );

     public List<PartETag> genPartTag(List<UploadPartRequest> uploadPartResultList) throws InterruptedException {
    
         CountDownLatch countDownLatch = new CountDownLatch(uploadPartResultList.size());
         List<PartETag> partETags=new ArrayList<>(uploadPartResultList.size());
         for (int i = 1; i < uploadPartResultList.size(); i++) {
             int finalI = i;
             threadPoolExecutor.execute(() -> {
                 try {
                     UploadPartResult  uploadPartResult= ossClient.uploadPart(uploadPartResultList.get(finalI));
                     partETags.add(uploadPartResult.getPartETag());
                 } catch (Throwable e) {
                     throw new RuntimeException(e);
                 }
             });
             countDownLatch.countDown();;
         }
         countDownLatch.await();
         threadPoolExecutor.shutdown();
         Collections.sort(partETags, (o1, o2) -> o1.getPartNumber()-o2.getPartNumber());
         return partETags;
    
     }
    

    }

    package com.jielu.aliyun.oss;

    import com.aliyun.oss.OSSClient;
    import com.aliyun.oss.model.*;
    import org.apache.commons.lang3.time.FastDateFormat;
    import org.springframework.stereotype.Component;
    import org.springframework.web.multipart.MultipartFile;

    import java.io.IOException;
    import java.io.InputStream;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.Locale;

    @Component
    public class OssUtil {

    /**
     * get the result url this url can be used for download
     * @param multipartFileList
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public List<String> uploadFileToOss(List<MultipartFile> multipartFileList){
        String endPoint = "", accessKeyId = "", accessKeySecret = "", bucket = "", bucketName = "";
        String objectName = "/lycol/upload/oss/" + FastDateFormat.getInstance("yyyyMMdd", Locale.CHINESE).format(new Date());
    
        OSSClient ossClient = OssClientFactory.createOssClient(endPoint, accessKeyId, accessKeySecret, bucket);
        InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, objectName);
        // 初始化分片。
        InitiateMultipartUploadResult uploadResult = ossClient.initiateMultipartUpload(request);
        String uploadId = uploadResult.getUploadId();
        List<String> res = new ArrayList<>();
        try {
    
        for (MultipartFile multipartFile : multipartFileList) {
    
            InputStream inputStream = multipartFile.getInputStream();
            SliceInputStream sliceInputStream = new SliceInputStream(inputStream, 1024);
            List<UploadPartRequest> uploadPartRequestList = sliceInputStream.genUploadPartRequestList(bucketName, objectName, uploadId);
            OssMultipleThreadUploadExecutor ossMultipleThreadUploadExecutor =
                    new OssMultipleThreadUploadExecutor(ossClient);
    
            List<PartETag> partETags = ossMultipleThreadUploadExecutor.genPartTag(uploadPartRequestList);
            CompleteMultipartUploadRequest completeMultipartUploadRequest =
                    new CompleteMultipartUploadRequest(bucketName, objectName, uploadId, partETags);
    
            CompleteMultipartUploadResult completeMultipartUploadResult =
                    ossClient.completeMultipartUpload(completeMultipartUploadRequest);
    
            res.add(completeMultipartUploadResult.getLocation());
            IOUtils.close(inputStream);
        }
        }
    
        catch (Exception e){
            throw  new RuntimeException(e);
        }
        finally {
            ossClient.shutdown();
        }
    
        return res;
    
    }
    

    }

    public class SliceInputStream {

    private final int partSize;
    private InputStream inputStream;
    
    
    public SliceInputStream(InputStream inputStream,int partSize) {
        this.inputStream = inputStream;
        this.partSize=partSize;
    }
    
    public List<UploadPartRequest> genUploadPartRequestList(String bucketName,String objectName,String uploadId
                                                            ) throws IOException {
    
        final long partSize = this.partSize;
        int fileLength = inputStream.available();
        int partCount = (int) (fileLength / partSize);
        if (fileLength % partSize != 0) {
            partCount++;
        }
        List<UploadPartRequest> uploadPartRequestList = new ArrayList<>();
        for (int i = 0; i < partCount; i++) {
            long startPos = i * partSize;
            long curPartSize = (i + 1 == partCount) ? (fileLength - startPos) : partSize;
            //skip position
            inputStream.skip(startPos);
            UploadPartRequest uploadPartRequest = new UploadPartRequest();
            uploadPartRequest.setBucketName(bucketName);
            uploadPartRequest.setKey(objectName);
            uploadPartRequest.setUploadId(uploadId);
            uploadPartRequest.setInputStream(inputStream);
            uploadPartRequest.setPartSize(curPartSize);
            uploadPartRequestList.add(uploadPartRequest);
        }
        return uploadPartRequestList;
    
    }
    

    public class NamedThreadFactory implements ThreadFactory {

    protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
    
    protected final AtomicInteger mThreadNum = new AtomicInteger(1);
    
    protected final String mPrefix;
    
    protected final boolean mDaemon;
    
    protected final ThreadGroup mGroup;
    
    public NamedThreadFactory() {
        this("pool-" + POOL_SEQ.getAndIncrement(), false);
    }
    
    public NamedThreadFactory(String prefix) {
        this(prefix, false);
    }
    
    public NamedThreadFactory(String prefix, boolean daemon) {
        mPrefix = prefix + "-thread-";
        mDaemon = daemon;
        SecurityManager s = System.getSecurityManager();
        mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
    }
    
    @Override
    public Thread newThread(Runnable runnable) {
        String name = mPrefix + mThreadNum.getAndIncrement();
        Thread ret = new Thread(mGroup, runnable, name, 0);
        ret.setDaemon(mDaemon);
        return ret;
    }
    
    public ThreadGroup getThreadGroup() {
        return mGroup;
    }
    

    详情请看Git地址:https://github.com/LycolLoveLucy/mixedMutiple

    相关文章

      网友评论

          本文标题:Oss多线程分片方式上传文件

          本文链接:https://www.haomeiwen.com/subject/lyljwrtx.html