美文网首页
实习总结:文件迁移

实习总结:文件迁移

作者: 都让你们叫老了 | 来源:发表于2017-11-28 14:20 被阅读0次

    根据项目需求,将文件从FDFS,SASS迁移到azure。此文章记录了java对三个服务器文件上传下载简单的实现。

    @Controller
    @RequestMapping("/file")
    public class FileFloadMigrationController extends BaseController {
    
        private static Logger log = LoggerFactory.getLogger(FileServiceImpl.class);
        @Autowired
        @Qualifier("fileUploadServiceImpl")
        private FileService fileService;
    
        @Autowired
        private FileUploadedToDao fileUploadedToDao;
    
        @Autowired(required = false)
        protected MongoTemplate mongoTemplate;
    
        @Autowired
        private FileMapDao fileMapDao;
    
        @Autowired
        private FileMapService fileMapService;
    
        @Autowired
        private FileUploadedService fileUploadedService;
    
        /**
         * 文件迁移
         * 
         * @Title: fileMigration
         * @param state
         *            状态:fail:失败 upload:上传
         * @param module
         *            模块名 module=all:所有模块
         * @param lastModifyTime
         *            最后修改时间:存在该项时证明要导入新增数据
         * @return
         * @throws Exception
         */
        @RequestMapping(value = "/filefload/{state}/{module}", method = RequestMethod.GET)
        public @ResponseBody Rest fileMigration(@PathVariable String state, @PathVariable String module, String lastModifyTime) throws Exception {
            new HttpClient().getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(0, false));
            List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
            List<FileUploadedTo> findFileUploadedTos = new ArrayList<FileUploadedTo>();
            Map<Integer, Object> result = new HashMap<Integer, Object>();
            // 上传失败后
            if (state.equals("fail")) {
                // 从新上传所有模块中失败数据
                if (module.equals("all")) {
                    findFileUploadedTos = getFileUploadsFail();
                } else {
                    // 按模块上传
                    findFileUploadedTos = getFileUploadsFail(module);
                }
    
                result = uploadFileFail(findFileUploadedTos);
    
            } else if (state.equals("upload")) {
                if (module.equals("all")) {
                    // 上传所有模块中数据
                    fileUploadeds = getFileUploadsNomal();
                } else {
                    // 按模块上传数据
                    fileUploadeds = getFileUploadsNomal(module);
                }
                // 最后修改时间不为空,需要上传新文件
                // 比如2017-11-09日转移一次后,11日需要在转移。此时lastModifyTime 需要写为2017-11-09
                if (lastModifyTime != null && !lastModifyTime.equals("")) {
                    fileUploadeds = getFileUploadTosByLastModify(lastModifyTime);
                }
    
                result = uploadFileNomal(fileUploadeds);
            }
    
            return Rest.item(StateCode.STATUS_CODE_SUCCESS, result);
        }
    
        /**
         * 正常情况下文件迁移,不按照模块。所有模块
         * 
         * @return List<FileUploaded>
         */
        private List<FileUploaded> getFileUploadsNomal() {
            List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
            Query query = new Query();
            fileUploadeds = mongoTemplate.find(query, FileUploaded.class);
            return fileUploadeds;
        }
    
        /**
         * 正常情况下文件迁移,按照模块
         * 
         * @Title: getFileUploadsNomal
         * @param module
         *            模块名字
         * @return List<FileUploaded>
         */
        private List<FileUploaded> getFileUploadsNomal(String module) {
            List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
            Query query = new Query();
            Criteria criteria = Criteria.where("module").is(module);
            query.addCriteria(criteria);
            fileUploadeds = mongoTemplate.find(query, FileUploaded.class);
            return fileUploadeds;
        }
    
        /**
         * 上传过程中出现失败情况。从新上传。所有模块
         * 
         * @Title: getFileUploadsFail
         * @return List<FileUploadedTo>
         */
        private List<FileUploadedTo> getFileUploadsFail() {
            List<FileUploadedTo> findFileUploadedTos = new ArrayList<FileUploadedTo>();
            Query query = new Query();
            Criteria criteria = Criteria.where("isSuccess").is(false);
            query.addCriteria(criteria);
            findFileUploadedTos = mongoTemplate.find(query, FileUploadedTo.class);
            return findFileUploadedTos;
        }
    
        /**
         * 上传过程中出现失败情况。从新上传。按模块进行上传
         * 
         * @Title: getFileUploadsFail
         * @param module
         *            模块名
         * @return List<FileUploadedTo>
         */
        private List<FileUploadedTo> getFileUploadsFail(String module) {
            List<FileUploadedTo> findFileUploadedTos = new ArrayList<FileUploadedTo>();
            Query query = new Query();
            Criteria criteria = Criteria.where("fileUploaded.module").is(module).and("isSuccess").is(false);
            query.addCriteria(criteria);
            findFileUploadedTos = mongoTemplate.find(query, FileUploadedTo.class);
            return findFileUploadedTos;
        }
    
        /**
         * 上传部分后,出现新增数据。根据文件最后更新时间,进行操作
         * 
         * @Title: getFileUploadTosByLastModify
         * @param lastModifyTime
         *            最后修改时间
         * @return List<FileUploadedTo>
         * @throws ParseException 
         */
        private List<FileUploaded> getFileUploadTosByLastModify(String lastModifyTime) throws ParseException {
            List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
            Query query = new Query();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            Criteria criteria = Criteria.where("lastModify").gte(sdf.parse(lastModifyTime));
            query.addCriteria(criteria);
            fileUploadeds = mongoTemplate.find(query, FileUploaded.class);
            return fileUploadeds;
        }
    
        /**
         * 正常情况下上传文件主方法
         * 
         * @Title: uploadFileNomal
         * @param fileUploadeds
         * @return
         * @throws Exception
         */
        private Map<Integer, Object> uploadFileNomal(List<FileUploaded> fileUploadeds) throws Exception {
            Map<Integer, Object> map = new HashMap<Integer, Object>();
            List<FileUploadedTo> fileUploadedTos = new ArrayList<FileUploadedTo>();
            if (fileUploadeds == null || fileUploadeds.size() == 0) {
                return map;
            }
            System.out.println("共有文件:" + fileUploadeds.size() + "条");
            long startTime = System.currentTimeMillis();
            // 将表中的数据复制到扩展表中(将文件所有信息复制、设置org为要迁移文件的org、设置迁移失败(false))
            for (FileUploaded fileUploaded : fileUploadeds) {
                FileUploadedTo fileUploadedTo = new FileUploadedTo();
                fileUploadedTo.setFileUploaded(fileUploaded);
                fileUploadedTo.setOrg(fileUploaded.getOrg());
                fileUploadedTo.setSuccess(false);
                fileUploadedTos.add(fileUploadedTo);
            }
            // 保存复制表
            fileUploadedToDao.save(fileUploadedTos);
    
            uploadFileCurrency(fileUploadedTos);
    
            long endTime = System.currentTimeMillis();
            System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
            return map;
        }
    
        /**
         * 上传失败后重新上传主方法
         * 
         * @Title: uploadFileFail
         * @param fileUploadedTos
         * @return
         */
        private Map<Integer, Object> uploadFileFail(List<FileUploadedTo> fileUploadedTos) throws Exception {
            Map<Integer, Object> map = new HashMap<Integer,Object>();
            
            uploadFileCurrency(fileUploadedTos);
            
            return map;
        }
    
        /**
         * 上传文件通用方法
         * 
         * @Title: uploadFileCurrency
         * @param fileUploadedTos
         */
    
        private void uploadFileCurrency(List<FileUploadedTo> fileUploadedTos) {
            int i = 0;
            for (FileUploadedTo fileUploadedTo : fileUploadedTos) {
                String path = fileUploadedTo.getFileUploaded().getPath();
                
                if (path != null && path.indexOf("group") != -1) {
                    byte fileByte[] = fileService.getFileBytes(path);
                    if (fileByte == null) {
                        System.out.println("第" + i++ + "个 fileByte不存在,path是" + path);
                    } else {
                        if (path.indexOf(".pdf.") == -1) {// 不是从文件
                            uploadFile(fileUploadedTo, fileByte);
                        } else {// 是从文件
                            uploadSlaveFile(fileByte, "file.pdf", fileUploadedTo);
                        }
                    }
                }
            }
        }
    
        /**
         * 上传文件到分布式系统,并写入文件 文件限制大小 上传的文件可以直接通过id使用特定接口访问 上传失败return null
         * 
         * @param fileBytes
         *            文件
         * @param name
         *            文件名称
         * @param module
         *            模块标示
         * @return
         */
        private void uploadFile(FileUploadedTo fileUploadedTo, byte[] fileBytes) {
            // 获取fileUpload
            FileUploaded fileUploaded = fileUploadedTo.getFileUploaded();
            // 获取模板名
            String module = fileUploaded.getModule();
            // 获取当前文件信息内的org
            String org = fileUploaded.getOrg();
            // 获取文件名
            String name = fileUploaded.getName();
            // 获取文件ID
            String fileId = fileUploaded.getId();
            if (!isLegal(module)) {
                log.error("module ilegal: 模块名称非法");
            }
            try {
                // 长度小于3报错
                if (org.length() < 4) {
                    org = "org" + org;
                }
                CloudBlockBlob blob = BlobClient.uploadFile(fileBytes, org, module, name);
                // 如果上传成功会返回一个Blob对象(不为空)
                // 那么此时就将上传后的新路径以及是否上传成功状态进行更改。
                if (blob != null) {
                    // 设置更新成功
                    fileUploadedTo.setSuccess(true);
                    // 设置新路径
                    String newPath = BlobClient.getPath(org, blob.getName());
                    fileUploadedTo.setNewPath(newPath);
                    fileUploadedToDao.update(fileUploadedTo);
    
                    // 更新fileUpload路径。
                    fileUploaded.setPath(newPath);
                    mongoTemplate.save(fileUploaded);
    
                    // 更新filemap路径
                    FileMap fileMap = findByFileId(fileId);
                    if (fileMap != null) {
                        fileMap.setFilePath(newPath);
                        fileMapService.update(fileMap);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
        /**
         * 根据fileId 获取 fileMap。
         * 
         * @Title: findByFileId
         * @param id
         *            文件ID
         * @return
         * @throws
         */
        private FileMap findByFileId(String id) {
            Query query = new Query(Criteria.where("fileId").is(id));
            return fileMapDao.findOne(query);
        }
    
        /**
         * 校验模块是否合法<br/>
         * 模块名在properties中存在即合法
         * 
         * @param module
         *            模块名称
         * @return 合法返回true ,非法返回false
         */
        private boolean isLegal(String module) {
            if (module.indexOf('.') >= 0) {
                return false;
            }
            String v = PropertiesUtils.getString("file.path." + module);
            if (v == null || v.length() < 1) {
                return false;
            } else {
                return true;
            }
        }
    
        /**
         * 上传从文件主方法
         * 
         * @Title: uploadSlaveFile
         * @param fileBytes
         *            从文件字节大小
         * @param prefix_name
         *            从文件前缀
         * @param fileUploadedTo
         *            fileUploaded 复制表
         */
        private void uploadSlaveFile(byte[] fileBytes, String prefix_name, FileUploadedTo fileUploadedTo) {
            String org = fileUploadedTo.getFileUploaded().getOrg();
            FileUploaded master = fileUploadedTo.getFileUploaded();
            String preName = master.getName().substring(0, master.getName().lastIndexOf("."));
            prefix_name = BlobClient.getPath(preName.concat("_cut"), preName.concat(".pdf"));
            System.out.println(prefix_name);
            try {
                CloudBlockBlob blob = BlobClient.uploadFile(fileBytes, org, master.getModule(), prefix_name);
                if (blob != null) {
                    fileUploadedTo.setSuccess(true);
                }
                fileUploadedTo.setNewPath(BlobClient.getPath(org, blob.getName()));
            } catch (InvalidKeyException | URISyntaxException | StorageException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
            fileUploadedToDao.update(fileUploadedTo);
        }
    }
    
    

    上面的类首先获取数据库中的文件,然后根据文件的路径获取文件的字节。如果路径符合规范且字节不为空,那么就将该文件上传。下面的方法是上传到Azure的方法。

    Azure Storage 是微软 Azure 云提供的云端存储解决方案,当前支持的存储类型有 Blob、Queue、File 和 Table。

    Azure Blob Storage 是用来存放大量的像文本、图片、视频等非结构化数据的存储服务。我们可以在任何地方通过互联网协议 http 或者 https 访问 Blob Storage。简单说,就是把文件放在云上,给它一个 URL,通过这个 URL 来访问文件。这就涉及到一个问题:如何控制访问权限?答案是我们可以根据自己的需要,设置 Blob 对象是只能被自己访问,还是可以被所有人访问。

    下面是 Blog Storage 典型的应用场景:

    • 存储图片和文档,这些文件可以直接通过浏览器访问。
    • 支持分布式访问,主要用于 cdn。
    • 提供视频、音频流。
    • 存储基本的文件备份和归档文件。
        static final String connectionString = String.format(
                "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.chinacloudapi.cn",
                storageAccountName, storageAccountKey);
        /**
         * 通过容器的方式实现上传二进制文件
         * @throws InvalidKeyException 
         * @time:2017年9月22日 上午10:40:19
         */
        public static CloudBlockBlob uploadFile(byte[] fileBytes,String org,String module,String fileName) throws URISyntaxException, StorageException, IOException, InvalidKeyException{
            // CloudStorageAccount 类表示一个 Azure Storage Account,我们需要先创建它的实例,才能访问属于它的资源。
            // 注意连接字符串中的xxx和yyy,分别对应Access keys中的Storage account name 和 key。
            CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
    
            // Create the blob client object.
            CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
            CloudBlobContainer container = blobClient.getContainerReference(org);
            container.createIfNotExists(); // 创建一个容器(如果该容器不存在)
            // 保存到azure上的文件的名称,由于azure服务器对重名文件进行覆盖处理,
            // 上传时对文件名称进行唯一标识处理,缺点是在文件服务器看不到文件的真识名称了。
            UUID token = UUID.randomUUID();
            int postFixIndex = fileName.lastIndexOf(".");
            String fileFullName = "";
            String postFix = "";
            String startName = "";
            if (postFixIndex > 0) {
                postFix = fileName.substring(postFixIndex + 1);
                startName = fileName.substring(0, postFixIndex);
            }else {
                startName = fileName;
            }
    
            /*azure服务器存储相同文件名的文件会覆盖之前的,因此需要对文件名进行处理
            存储到azure上的文件名格试为:原文件名+"_"+uuid+扩展名
            */
            fileFullName = startName.concat("_").concat(token.toString()).concat(".").concat(postFix);
    
    
            /*getPath(module,fileFullName) 为放在 container 中的 Blob 的连接路径。
              getBlockBlobReference 方法获得一个 Block 类型的 Blob 对象的引用。
              您可以根据应用的需要,分别调用 getBlobReference,getAppendBlobReference 或 getPageBlobReference 来创建不同类型的 Blob 对象。
            */
            CloudBlockBlob blob = container.getBlockBlobReference(getPath(module,fileFullName));
            try
            {
                InputStream stream = new ByteArrayInputStream(fileBytes);
              //blob.upload(stream, (long) fileBytes.length);
                PutBlock(blob,stream);
            }
            catch (StorageException e)
            {
                return null;
            }
          return blob;
        }
    

    大文件分块上传:

      /**
        *文件过大,分块上传
        * @Title: PutBlock
        * @param blob
        * @param stream
        * @throws StorageException
        * @throws IOException
         */
    @SuppressWarnings({ "unchecked", "rawtypes" })
        public static void PutBlock(CloudBlockBlob blob,InputStream stream) throws StorageException, IOException  
        {  
            Iterable<BlockEntry> blockList = new ArrayList<BlockEntry>();
            int len = stream.available();
            int bytesRead = 0;
            int cur_read_len = STEP_LENGTH;
            byte[] b = null;
            int index = 0;
            if (len <= STEP_LENGTH) {// 如果文件没有超过设定长度,一次上传上即可
                blob.upload(stream, (long) len);
            } else {// 如果文件太大,分批上传
                while (true) {
                    if (len - bytesRead > STEP_LENGTH) {
                        cur_read_len = STEP_LENGTH;
                    } else {
                        cur_read_len = len - bytesRead;
                    }
                    b = new byte[cur_read_len + 1];
                    int bytesReadLength = 0;
                    bytesReadLength = stream.read(b, 0, cur_read_len);
                    if (bytesReadLength == -1){// end of InputStream
                        blob.commitBlockList(blockList);
                        break;
                    }
                    bytesRead += bytesReadLength;
                    if (bytesRead <= len) {
                        try {
                            String blockId = Base64.getEncoder().encodeToString(String.format("%08d",index).getBytes(StandardCharsets.UTF_8));
                            blob.uploadBlock(blockId, new ByteArrayInputStream(b),Long.valueOf(bytesReadLength));
                            ((AbstractCollection) blockList).add(new BlockEntry(blockId));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        
                    } 
                    index++;
                }
            }
          
        }
    

    相关文章

      网友评论

          本文标题:实习总结:文件迁移

          本文链接:https://www.haomeiwen.com/subject/rausbxtx.html