美文网首页
聊聊flink的BlobStoreService

聊聊flink的BlobStoreService

作者: go4it | 来源:发表于2019-03-01 09:23 被阅读6次

    本文主要研究一下flink的BlobStoreService

    BlobView

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java

    public interface BlobView {
    
        /**
         * Copies a blob to a local file.
         *
         * @param jobId     ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
         * @param blobKey   The blob ID
         * @param localFile The local file to copy to
         *
         * @return whether the file was copied (<tt>true</tt>) or not (<tt>false</tt>)
         * @throws IOException If the copy fails
         */
        boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException;
    }
    
    • BlobView定义了get方法,将指定的blob拷贝到localFile

    BlobStore

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java

    public interface BlobStore extends BlobView {
    
        /**
         * Copies the local file to the blob store.
         *
         * @param localFile The file to copy
         * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
         * @param blobKey   The ID for the file in the blob store
         *
         * @return whether the file was copied (<tt>true</tt>) or not (<tt>false</tt>)
         * @throws IOException If the copy fails
         */
        boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException;
    
        /**
         * Tries to delete a blob from storage.
         *
         * <p>NOTE: This also tries to delete any created directories if empty.</p>
         *
         * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
         * @param blobKey The blob ID
         *
         * @return  <tt>true</tt> if the given blob is successfully deleted or non-existing;
         *          <tt>false</tt> otherwise
         */
        boolean delete(JobID jobId, BlobKey blobKey);
    
        /**
         * Tries to delete all blobs for the given job from storage.
         *
         * <p>NOTE: This also tries to delete any created directories if empty.</p>
         *
         * @param jobId The JobID part of all blobs to delete
         *
         * @return  <tt>true</tt> if the job directory is successfully deleted or non-existing;
         *          <tt>false</tt> otherwise
         */
        boolean deleteAll(JobID jobId);
    }
    
    • BlobStore继承了BlobView,它定义了put、delete、deleteAll方法

    BlobStoreService

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java

    public interface BlobStoreService extends BlobStore, Closeable {
    
        /**
         * Closes and cleans up the store. This entails the deletion of all blobs.
         */
        void closeAndCleanupAllData();
    }
    
    • BlobStoreService继承了BlobStore及Closeable接口,它定义了closeAndCleanupAllData方法;它有两个实现类,分别是VoidBlobStore、FileSystemBlobStore

    VoidBlobStore

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java

    public class VoidBlobStore implements BlobStoreService {
    
        @Override
        public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
            return false;
        }
    
        @Override
        public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
            return false;
        }
    
        @Override
        public boolean delete(JobID jobId, BlobKey blobKey) {
            return true;
        }
    
        @Override
        public boolean deleteAll(JobID jobId) {
            return true;
        }
    
        @Override
        public void closeAndCleanupAllData() {}
    
        @Override
        public void close() throws IOException {}
    }
    
    • VoidBlobStore实现了BlobStoreService接口,它执行空操作

    FileSystemBlobStore

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java

    public class FileSystemBlobStore implements BlobStoreService {
    
        private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class);
    
        /** The file system in which blobs are stored. */
        private final FileSystem fileSystem;
    
        /** The base path of the blob store. */
        private final String basePath;
    
        public FileSystemBlobStore(FileSystem fileSystem, String storagePath) throws IOException {
            this.fileSystem = checkNotNull(fileSystem);
            this.basePath = checkNotNull(storagePath) + "/blob";
    
            LOG.info("Creating highly available BLOB storage directory at {}", basePath);
    
            fileSystem.mkdirs(new Path(basePath));
            LOG.debug("Created highly available BLOB storage directory at {}", basePath);
        }
    
        // - Put ------------------------------------------------------------------
    
        @Override
        public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
            return put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
        }
    
        private boolean put(File fromFile, String toBlobPath) throws IOException {
            try (OutputStream os = fileSystem.create(new Path(toBlobPath), FileSystem.WriteMode.OVERWRITE)) {
                LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
                Files.copy(fromFile, os);
            }
            return true;
        }
    
        // - Get ------------------------------------------------------------------
    
        @Override
        public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
            return get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile, blobKey);
        }
    
        private boolean get(String fromBlobPath, File toFile, BlobKey blobKey) throws IOException {
            checkNotNull(fromBlobPath, "Blob path");
            checkNotNull(toFile, "File");
            checkNotNull(blobKey, "Blob key");
    
            if (!toFile.exists() && !toFile.createNewFile()) {
                throw new IOException("Failed to create target file to copy to");
            }
    
            final Path fromPath = new Path(fromBlobPath);
            MessageDigest md = BlobUtils.createMessageDigest();
    
            final int buffSize = 4096; // like IOUtils#BLOCKSIZE, for chunked file copying
    
            boolean success = false;
            try (InputStream is = fileSystem.open(fromPath);
                FileOutputStream fos = new FileOutputStream(toFile)) {
                LOG.debug("Copying from {} to {}.", fromBlobPath, toFile);
    
                // not using IOUtils.copyBytes(is, fos) here to be able to create a hash on-the-fly
                final byte[] buf = new byte[buffSize];
                int bytesRead = is.read(buf);
                while (bytesRead >= 0) {
                    fos.write(buf, 0, bytesRead);
                    md.update(buf, 0, bytesRead);
    
                    bytesRead = is.read(buf);
                }
    
                // verify that file contents are correct
                final byte[] computedKey = md.digest();
                if (!Arrays.equals(computedKey, blobKey.getHash())) {
                    throw new IOException("Detected data corruption during transfer");
                }
    
                success = true;
            } finally {
                // if the copy fails, we need to remove the target file because
                // outside code relies on a correct file as long as it exists
                if (!success) {
                    try {
                        toFile.delete();
                    } catch (Throwable ignored) {}
                }
            }
    
            return true; // success is always true here
        }
    
        // - Delete ---------------------------------------------------------------
    
        @Override
        public boolean delete(JobID jobId, BlobKey blobKey) {
            return delete(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
        }
    
        @Override
        public boolean deleteAll(JobID jobId) {
            return delete(BlobUtils.getStorageLocationPath(basePath, jobId));
        }
    
        private boolean delete(String blobPath) {
            try {
                LOG.debug("Deleting {}.", blobPath);
    
                Path path = new Path(blobPath);
    
                boolean result = fileSystem.delete(path, true);
    
                // send a call to delete the directory containing the file. This will
                // fail (and be ignored) when some files still exist.
                try {
                    fileSystem.delete(path.getParent(), false);
                    fileSystem.delete(new Path(basePath), false);
                } catch (IOException ignored) {}
                return result;
            }
            catch (Exception e) {
                LOG.warn("Failed to delete blob at " + blobPath);
                return false;
            }
        }
    
        @Override
        public void closeAndCleanupAllData() {
            try {
                LOG.debug("Cleaning up {}.", basePath);
    
                fileSystem.delete(new Path(basePath), true);
            }
            catch (Exception e) {
                LOG.error("Failed to clean up recovery directory.", e);
            }
        }
    
        @Override
        public void close() throws IOException {
            // nothing to do for the FileSystemBlobStore
        }
    }
    
    • FileSystemBlobStore实现了BlobStoreService,它的构造器要求传入fileSystem及storagePath;put方法通过fileSystem.create来创建目标OutputStream,然后通过Files.copy把localFile拷贝到toBlobPath;get方法通过fileSystem.open打开要读取的blob,然后写入到localFile;delete及deleteAll方法通过BlobUtils.getStorageLocationPath获取blobPath,然后调用fileSystem.delete来删除;closeAndCleanupAllData方法直接调用fileSystem.delete来递归删除整个storagePath

    小结

    • BlobView定义了get方法,将指定的blob拷贝到localFile;BlobStore继承了BlobView,它定义了put、delete、deleteAll方法
    • BlobStoreService继承了BlobStore及Closeable接口,它定义了closeAndCleanupAllData方法;它有两个实现类,分别是VoidBlobStore、FileSystemBlobStore
    • VoidBlobStore实现了BlobStoreService接口,它执行空操作;FileSystemBlobStore实现了BlobStoreService,它的构造器要求传入fileSystem及storagePath;put方法通过fileSystem.create来创建目标OutputStream,然后通过Files.copy把localFile拷贝到toBlobPath;get方法通过fileSystem.open打开要读取的blob,然后写入到localFile;delete及deleteAll方法通过BlobUtils.getStorageLocationPath获取blobPath,然后调用fileSystem.delete来删除;closeAndCleanupAllData方法直接调用fileSystem.delete来递归删除整个storagePath

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的BlobStoreService

          本文链接:https://www.haomeiwen.com/subject/hcyruqtx.html