美文网首页
聊聊flink的BlobWriter

聊聊flink的BlobWriter

作者: go4it | 来源:发表于2019-02-28 10:30 被阅读0次

    本文主要研究一下flink的BlobWriter

    BlobWriter

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java

    /**
     * BlobWriter is used to upload data to the BLOB store.
     */
    public interface BlobWriter {
    
        Logger LOG = LoggerFactory.getLogger(BlobWriter.class);
    
        /**
         * Uploads the data of the given byte array for the given job to the BLOB server and makes it
         * a permanent BLOB.
         *
         * @param jobId
         *      the ID of the job the BLOB belongs to
         * @param value
         *      the buffer to upload
         *
         * @return the computed BLOB key identifying the BLOB on the server
         *
         * @throws IOException
         *      thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
         *      store
         */
        PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException;
    
        /**
         * Uploads the data from the given input stream for the given job to the BLOB server and makes it
         * a permanent BLOB.
         *
         * @param jobId
         *      ID of the job this blob belongs to
         * @param inputStream
         *      the input stream to read the data from
         *
         * @return the computed BLOB key identifying the BLOB on the server
         *
         * @throws IOException
         *      thrown if an I/O error occurs while reading the data from the input stream, writing it to a
         *      local file, or uploading it to the HA store
         */
        PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException;
    
        /**
         * Returns the min size before data will be offloaded to the BLOB store.
         *
         * @return minimum offloading size
         */
        int getMinOffloadingSize();
    
        /**
         * Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum
         * offloading size of the BlobServer.
         *
         * @param value to serialize
         * @param jobId to which the value belongs.
         * @param blobWriter to use to offload the serialized value
         * @param <T> type of the value to serialize
         * @return Either the serialized value or the stored blob key
         * @throws IOException if the data cannot be serialized
         */
        static <T> Either<SerializedValue<T>, PermanentBlobKey> serializeAndTryOffload(
                T value,
                JobID jobId,
                BlobWriter blobWriter) throws IOException {
            Preconditions.checkNotNull(value);
            Preconditions.checkNotNull(jobId);
            Preconditions.checkNotNull(blobWriter);
    
            final SerializedValue<T> serializedValue = new SerializedValue<>(value);
    
            if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
                return Either.Left(new SerializedValue<>(value));
            } else {
                try {
                    final PermanentBlobKey permanentBlobKey = blobWriter.putPermanent(jobId, serializedValue.getByteArray());
    
                    return Either.Right(permanentBlobKey);
                } catch (IOException e) {
                    LOG.warn("Failed to offload value {} for job {} to BLOB store.", value, jobId, e);
    
                    return Either.Left(serializedValue);
                }
            }
        }
    }
    
    • BlobWriter定义了putPermanent、getMinOffloadingSize方法,同时还提供了serializeAndTryOffload静态方法用于序列化指定value并在其大小超过minimum offloading size时调用blobWriter.putPermanent存放到BlobServer

    BlobServer

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java

    /**
     * This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and
     * spawning threads to handle these requests. Furthermore, it takes care of creating the directory structure to store
     * the BLOBs or temporarily cache them.
     */
    public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {
        //......
    
        @Override
        public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
            checkNotNull(jobId);
            return (PermanentBlobKey) putBuffer(jobId, value, PERMANENT_BLOB);
        }
    
        @Override
        public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
            checkNotNull(jobId);
            return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB);
        }
    
        /**
         * Returns the configuration used by the BLOB server.
         *
         * @return configuration
         */
        @Override
        public final int getMinOffloadingSize() {
            return blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);
        }
    
        /**
         * Uploads the data of the given byte array for the given job to the BLOB server.
         *
         * @param jobId
         *      the ID of the job the BLOB belongs to
         * @param value
         *      the buffer to upload
         * @param blobType
         *      whether to make the data permanent or transient
         *
         * @return the computed BLOB key identifying the BLOB on the server
         *
         * @throws IOException
         *      thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
         *      store
         */
        private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType blobType)
                throws IOException {
    
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received PUT call for BLOB of job {}.", jobId);
            }
    
            File incomingFile = createTemporaryFilename();
            MessageDigest md = BlobUtils.createMessageDigest();
            BlobKey blobKey = null;
            try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
                md.update(value);
                fos.write(value);
            } catch (IOException ioe) {
                // delete incomingFile from a failed download
                if (!incomingFile.delete() && incomingFile.exists()) {
                    LOG.warn("Could not delete the staging file {} for job {}.",
                        incomingFile, jobId);
                }
                throw ioe;
            }
    
            try {
                // persist file
                blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
    
                return blobKey;
            } finally {
                // delete incomingFile from a failed download
                if (!incomingFile.delete() && incomingFile.exists()) {
                    LOG.warn("Could not delete the staging file {} for blob key {} and job {}.",
                        incomingFile, blobKey, jobId);
                }
            }
        }
    
        /**
         * Uploads the data from the given input stream for the given job to the BLOB server.
         *
         * @param jobId
         *      the ID of the job the BLOB belongs to
         * @param inputStream
         *      the input stream to read the data from
         * @param blobType
         *      whether to make the data permanent or transient
         *
         * @return the computed BLOB key identifying the BLOB on the server
         *
         * @throws IOException
         *      thrown if an I/O error occurs while reading the data from the input stream, writing it to a
         *      local file, or uploading it to the HA store
         */
        private BlobKey putInputStream(
                @Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType)
                throws IOException {
    
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received PUT call for BLOB of job {}.", jobId);
            }
    
            File incomingFile = createTemporaryFilename();
            MessageDigest md = BlobUtils.createMessageDigest();
            BlobKey blobKey = null;
            try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
                // read stream
                byte[] buf = new byte[BUFFER_SIZE];
                while (true) {
                    final int bytesRead = inputStream.read(buf);
                    if (bytesRead == -1) {
                        // done
                        break;
                    }
                    fos.write(buf, 0, bytesRead);
                    md.update(buf, 0, bytesRead);
                }
    
                // persist file
                blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
    
                return blobKey;
            } finally {
                // delete incomingFile from a failed download
                if (!incomingFile.delete() && incomingFile.exists()) {
                    LOG.warn("Could not delete the staging file {} for blob key {} and job {}.",
                        incomingFile, blobKey, jobId);
                }
            }
        }
    
        /**
         * Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
         * use.
         *
         * @param incomingFile
         *      temporary file created during transfer
         * @param jobId
         *      ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
         * @param digest
         *      BLOB content digest, i.e. hash
         * @param blobType
         *      whether this file is a permanent or transient BLOB
         *
         * @return unique BLOB key that identifies the BLOB on the server
         *
         * @throws IOException
         *      thrown if an I/O error occurs while moving the file or uploading it to the HA store
         */
        BlobKey moveTempFileToStore(
                File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType)
                throws IOException {
    
            int retries = 10;
    
            int attempt = 0;
            while (true) {
                // add unique component independent of the BLOB content
                BlobKey blobKey = BlobKey.createKey(blobType, digest);
                File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
    
                // try again until the key is unique (put the existence check into the lock!)
                readWriteLock.writeLock().lock();
                try {
                    if (!storageFile.exists()) {
                        BlobUtils.moveTempFileToStore(
                            incomingFile, jobId, blobKey, storageFile, LOG,
                            blobKey instanceof PermanentBlobKey ? blobStore : null);
                        // add TTL for transient BLOBs:
                        if (blobKey instanceof TransientBlobKey) {
                            // must be inside read or write lock to add a TTL
                            blobExpiryTimes
                                .put(Tuple2.of(jobId, (TransientBlobKey) blobKey),
                                    System.currentTimeMillis() + cleanupInterval);
                        }
                        return blobKey;
                    }
                } finally {
                    readWriteLock.writeLock().unlock();
                }
    
                ++attempt;
                if (attempt >= retries) {
                    String message = "Failed to find a unique key for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + ".";
                    LOG.error(message + " No retries left.");
                    throw new IOException(message);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Trying to find a unique key for BLOB of job {} (retry {}, last tried {})",
                            jobId, attempt, storageFile.getAbsolutePath());
                    }
                }
            }
        }
    
        /**
         * Returns a temporary file inside the BLOB server's incoming directory.
         *
         * @return a temporary file inside the BLOB server's incoming directory
         *
         * @throws IOException
         *      if creating the directory fails
         */
        File createTemporaryFilename() throws IOException {
            return new File(BlobUtils.getIncomingDirectory(storageDir),
                    String.format("temp-%08d", tempFileCounter.getAndIncrement()));
        }
    
        //......
    }
    
    • BlobServer实现了BlobWriter接口,putPermanent方法分别用到了putBuffer及putInputStream方法,而getMinOffloadingSize方法则从blobServiceConfiguration获取BlobServerOptions.OFFLOAD_MINSIZE配置,默认是1M
    • putBuffer方法接收byte[]参数,它先把byte[]写入到临时文件,之后调用moveTempFileToStore方法进行持久化;putInputStream方法接收InputStream参数,它也是先把InputStream写入到临时文件,然后调用moveTempFileToStore方法进行持久化
    • moveTempFileToStore方法调用了BlobUtils.moveTempFileToStore将本地临时文件转移到permanent location;其中storageDir由BlobUtils.initLocalStorageDirectory(config)来初始化,而storageFile通过BlobUtils.getStorageLocation(storageDir, jobId, blobKey)来获取

    BlobUtils

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java

    /**
     * Utility class to work with blob data.
     */
    public class BlobUtils {
        //......
    
        /**
         * Creates a local storage directory for a blob service under the configuration parameter given
         * by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is <tt>null</tt> or empty, we will
         * fall back to Flink's temp directories (given by
         * {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}) and choose one among them at
         * random.
         *
         * @param config
         *      Flink configuration
         *
         * @return a new local storage directory
         *
         * @throws IOException
         *      thrown if the local file storage cannot be created or is not usable
         */
        static File initLocalStorageDirectory(Configuration config) throws IOException {
    
            String basePath = config.getString(BlobServerOptions.STORAGE_DIRECTORY);
    
            File baseDir;
            if (StringUtils.isNullOrWhitespaceOnly(basePath)) {
                final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(config);
                baseDir = new File(tmpDirPaths[RANDOM.nextInt(tmpDirPaths.length)]);
            }
            else {
                baseDir = new File(basePath);
            }
    
            File storageDir;
    
            // NOTE: although we will be using UUIDs, there may be collisions
            int maxAttempts = 10;
            for (int attempt = 0; attempt < maxAttempts; attempt++) {
                storageDir = new File(baseDir, String.format(
                        "blobStore-%s", UUID.randomUUID().toString()));
    
                // Create the storage dir if it doesn't exist. Only return it when the operation was
                // successful.
                if (storageDir.mkdirs()) {
                    return storageDir;
                }
            }
    
            // max attempts exceeded to find a storage directory
            throw new IOException("Could not create storage directory for BLOB store in '" + baseDir + "'.");
        }
    
        /**
         * Returns the (designated) physical storage location of the BLOB with the given key.
         *
         * @param storageDir
         *      storage directory used be the BLOB service
         * @param key
         *      the key identifying the BLOB
         * @param jobId
         *      ID of the job for the incoming files (or <tt>null</tt> if job-unrelated)
         *
         * @return the (designated) physical storage location of the BLOB
         *
         * @throws IOException
         *      if creating the directory fails
         */
        static File getStorageLocation(
                File storageDir, @Nullable JobID jobId, BlobKey key) throws IOException {
            File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
    
            Files.createDirectories(file.getParentFile().toPath());
    
            return file;
        }
    
        /**
         * Returns the path for the given blob key.
         *
         * <p>The returned path can be used with the (local or HA) BLOB store file system back-end for
         * recovery purposes and follows the same scheme as {@link #getStorageLocation(File, JobID,
         * BlobKey)}.
         *
         * @param storageDir
         *      storage directory used be the BLOB service
         * @param key
         *      the key identifying the BLOB
         * @param jobId
         *      ID of the job for the incoming files
         *
         * @return the path to the given BLOB
         */
        static String getStorageLocationPath(
                String storageDir, @Nullable JobID jobId, BlobKey key) {
            if (jobId == null) {
                // format: $base/no_job/blob_$key
                return String.format("%s/%s/%s%s",
                    storageDir, NO_JOB_DIR_PREFIX, BLOB_FILE_PREFIX, key.toString());
            } else {
                // format: $base/job_$jobId/blob_$key
                return String.format("%s/%s%s/%s%s",
                    storageDir, JOB_DIR_PREFIX, jobId.toString(), BLOB_FILE_PREFIX, key.toString());
            }
        }
    
        /**
         * Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
         * use (not thread-safe!).
         *
         * @param incomingFile
         *      temporary file created during transfer
         * @param jobId
         *      ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
         * @param blobKey
         *      BLOB key identifying the file
         * @param storageFile
         *      (local) file where the blob is/should be stored
         * @param log
         *      logger for debug information
         * @param blobStore
         *      HA store (or <tt>null</tt> if unavailable)
         *
         * @throws IOException
         *      thrown if an I/O error occurs while moving the file or uploading it to the HA store
         */
        static void moveTempFileToStore(
                File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile,
                Logger log, @Nullable BlobStore blobStore) throws IOException {
    
            try {
                // first check whether the file already exists
                if (!storageFile.exists()) {
                    try {
                        // only move the file if it does not yet exist
                        Files.move(incomingFile.toPath(), storageFile.toPath());
    
                        incomingFile = null;
    
                    } catch (FileAlreadyExistsException ignored) {
                        log.warn("Detected concurrent file modifications. This should only happen if multiple" +
                            "BlobServer use the same storage directory.");
                        // we cannot be sure at this point whether the file has already been uploaded to the blob
                        // store or not. Even if the blobStore might shortly be in an inconsistent state, we have
                        // to persist the blob. Otherwise we might not be able to recover the job.
                    }
    
                    if (blobStore != null) {
                        // only the one moving the incoming file to its final destination is allowed to upload the
                        // file to the blob store
                        blobStore.put(storageFile, jobId, blobKey);
                    }
                } else {
                    log.warn("File upload for an existing file with key {} for job {}. This may indicate a duplicate upload or a hash collision. Ignoring newest upload.", blobKey, jobId);
                }
                storageFile = null;
            } finally {
                // we failed to either create the local storage file or to upload it --> try to delete the local file
                // while still having the write lock
                if (storageFile != null && !storageFile.delete() && storageFile.exists()) {
                    log.warn("Could not delete the storage file {}.", storageFile);
                }
                if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) {
                    log.warn("Could not delete the staging file {} for blob key {} and job {}.", incomingFile, blobKey, jobId);
                }
            }
        }
    
        //......
    }
    
    • initLocalStorageDirectory方法从配置文件读取BlobServerOptions.STORAGE_DIRECTORY配置(blob.storage.directory),如果没有配置,则通过ConfigurationUtils.parseTempDirectories来获取tmpDirPaths,然后随机选一个作为baseDir,而storageDir目录则是baseDir的子目录,其目录名前缀为blobStore
    • getStorageLocation方法则在storageDir的基础上根据JobID及BlobKey构造具体的存储路径,其格式为$base/no_job/blob_$key或者$base/job_$jobId/blob_$key
    • moveTempFileToStore方法则在目标文件不存在的场景下使用Files.move将incomingFile转移到storageFile,如果blobStore不为null,还会将storageFile放入到BlobStore

    小结

    • BlobWriter定义了putPermanent、getMinOffloadingSize方法,同时还提供了serializeAndTryOffload静态方法用于序列化指定value并在其大小超过minimum offloading size时调用blobWriter.putPermanent存放到BlobServer
    • BlobServer实现了BlobWriter接口,putPermanent方法分别用到了putBuffer及putInputStream方法,而getMinOffloadingSize方法则从blobServiceConfiguration获取BlobServerOptions.OFFLOAD_MINSIZE配置,默认是1M;putBuffer方法接收byte[]参数,它先把byte[]写入到临时文件,之后调用moveTempFileToStore方法进行持久化;putInputStream方法接收InputStream参数,它也是先把InputStream写入到临时文件,然后调用moveTempFileToStore方法进行持久化;moveTempFileToStore方法调用了BlobUtils.moveTempFileToStore将本地临时文件转移到permanent location;其中storageDir由BlobUtils.initLocalStorageDirectory(config)来初始化,而storageFile通过BlobUtils.getStorageLocation(storageDir, jobId, blobKey)来获取
    • BlobUtils的initLocalStorageDirectory方法从配置文件读取BlobServerOptions.STORAGE_DIRECTORY配置(blob.storage.directory),如果没有配置,则通过ConfigurationUtils.parseTempDirectories来获取tmpDirPaths,然后随机选一个作为baseDir,而storageDir目录则是baseDir的子目录,其目录名前缀为blobStore;getStorageLocation方法则在storageDir的基础上根据JobID及BlobKey构造具体的存储路径,其格式为$base/no_job/blob_$key或者$base/job_$jobId/blob_$key;moveTempFileToStore方法则在目标文件不存在的场景下使用Files.move将incomingFile转移到storageFile,如果blobStore不为null,还会将storageFile放入到BlobStore

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的BlobWriter

          本文链接:https://www.haomeiwen.com/subject/sdthuqtx.html