美文网首页
聊聊flink的BlobService

聊聊flink的BlobService

作者: go4it | 来源:发表于2019-02-27 10:09 被阅读3次

    本文主要研究一下flink的BlobService

    BlobService

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java

    /**
     * A simple store and retrieve binary large objects (BLOBs).
     */
    public interface BlobService extends Closeable {
    
        /**
         * Returns a BLOB service for accessing permanent BLOBs.
         *
         * @return BLOB service
         */
        PermanentBlobService getPermanentBlobService();
    
        /**
         * Returns a BLOB service for accessing transient BLOBs.
         *
         * @return BLOB service
         */
        TransientBlobService getTransientBlobService();
    
        /**
         * Returns the port of the BLOB server that this BLOB service is working with.
         *
         * @return the port the blob server.
         */
        int getPort();
    }
    
    • BlobService定义了getPermanentBlobService方法用于获取PermanentBlobService;getTransientBlobService方法用于获取TransientBlobService

    PermanentBlobService

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java

    /**
     * A service to retrieve permanent binary large objects (BLOBs).
     *
     * <p>These may include per-job BLOBs that are covered by high-availability (HA) mode, e.g. a job's
     * JAR files or (parts of) an off-loaded {@link org.apache.flink.runtime.deployment.TaskDeploymentDescriptor}
     * or files in the {@link org.apache.flink.api.common.cache.DistributedCache}.
     */
    public interface PermanentBlobService extends Closeable {
    
        /**
         * Returns the path to a local copy of the file associated with the provided job ID and blob
         * key.
         *
         * @param jobId
         *      ID of the job this blob belongs to
         * @param key
         *      BLOB key associated with the requested file
         *
         * @return The path to the file.
         *
         * @throws java.io.FileNotFoundException
         *      if the BLOB does not exist;
         * @throws IOException
         *      if any other error occurs when retrieving the file
         */
        File getFile(JobID jobId, PermanentBlobKey key) throws IOException;
    
    }
    
    • PermanentBlobService提供了getFile方法,它根据JobID及PermanentBlobKey来获取File

    TransientBlobService

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobService.java

    /**
     * A service to retrieve transient binary large objects (BLOBs) which are deleted on the
     * {@link BlobServer} when they are retrieved.
     *
     * <p>These may include per-job BLOBs like files in the {@link
     * org.apache.flink.api.common.cache.DistributedCache}, for example.
     *
     * <p>Note: None of these BLOBs is highly available (HA). This case is covered by BLOBs in the
     * {@link PermanentBlobService}.
     *
     * <p>TODO: change API to not rely on local files but return {@link InputStream} objects
     */
    public interface TransientBlobService extends Closeable {
    
        // --------------------------------------------------------------------------------------------
        //  GET
        // --------------------------------------------------------------------------------------------
    
        /**
         * Returns the path to a local copy of the (job-unrelated) file associated with the provided
         * blob key.
         *
         * @param key
         *      blob key associated with the requested file
         *
         * @return The path to the file.
         *
         * @throws java.io.FileNotFoundException
         *      when the path does not exist;
         * @throws IOException
         *      if any other error occurs when retrieving the file
         */
        File getFile(TransientBlobKey key) throws IOException;
    
        /**
         * Returns the path to a local copy of the file associated with the provided job ID and blob
         * key.
         *
         * @param jobId
         *      ID of the job this blob belongs to
         * @param key
         *      blob key associated with the requested file
         *
         * @return The path to the file.
         *
         * @throws java.io.FileNotFoundException
         *      when the path does not exist;
         * @throws IOException
         *      if any other error occurs when retrieving the file
         */
        File getFile(JobID jobId, TransientBlobKey key) throws IOException;
    
        // --------------------------------------------------------------------------------------------
        //  PUT
        // --------------------------------------------------------------------------------------------
    
        /**
         * Uploads the (job-unrelated) data of the given byte array to the BLOB server.
         *
         * @param value
         *      the buffer to upload
         *
         * @return the computed BLOB key identifying the BLOB on the server
         *
         * @throws IOException
         *      thrown if an I/O error occurs while uploading the data to the BLOB server
         */
        TransientBlobKey putTransient(byte[] value) throws IOException;
    
        /**
         * Uploads the data of the given byte array for the given job to the BLOB server.
         *
         * @param jobId
         *      the ID of the job the BLOB belongs to
         * @param value
         *      the buffer to upload
         *
         * @return the computed BLOB key identifying the BLOB on the server
         *
         * @throws IOException
         *      thrown if an I/O error occurs while uploading the data to the BLOB server
         */
        TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException;
    
        /**
         * Uploads the (job-unrelated) data from the given input stream to the BLOB server.
         *
         * @param inputStream
         *      the input stream to read the data from
         *
         * @return the computed BLOB key identifying the BLOB on the server
         *
         * @throws IOException
         *      thrown if an I/O error occurs while reading the data from the input stream or uploading the
         *      data to the BLOB server
         */
        TransientBlobKey putTransient(InputStream inputStream) throws IOException;
    
        /**
         * Uploads the data from the given input stream for the given job to the BLOB server.
         *
         * @param jobId
         *      ID of the job this blob belongs to
         * @param inputStream
         *      the input stream to read the data from
         *
         * @return the computed BLOB key identifying the BLOB on the server
         *
         * @throws IOException
         *      thrown if an I/O error occurs while reading the data from the input stream or uploading the
         *      data to the BLOB server
         */
        TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException;
    
        // --------------------------------------------------------------------------------------------
        //  DELETE
        // --------------------------------------------------------------------------------------------
    
        /**
         * Deletes the (job-unrelated) file associated with the provided blob key from the local cache.
         *
         * @param key
         *      associated with the file to be deleted
         *
         * @return  <tt>true</tt> if the given blob is successfully deleted or non-existing;
         *          <tt>false</tt> otherwise
         */
        boolean deleteFromCache(TransientBlobKey key);
    
        /**
         * Deletes the file associated with the provided job ID and blob key from the local cache.
         *
         * @param jobId
         *      ID of the job this blob belongs to
         * @param key
         *      associated with the file to be deleted
         *
         * @return  <tt>true</tt> if the given blob is successfully deleted or non-existing;
         *          <tt>false</tt> otherwise
         */
        boolean deleteFromCache(JobID jobId, TransientBlobKey key);
    
    }
    
    • TransientBlobService用于获取transient binary large objects (BLOBs),这些blobs在获取时就会在BlobServer上删掉;它提供了getFile、putTransient、deleteFromCache方法

    BlobKey

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java

    /**
     * A BLOB key uniquely identifies a BLOB.
     */
    public abstract class BlobKey implements Serializable, Comparable<BlobKey> {
    
        private static final long serialVersionUID = 3847117712521785209L;
    
        /** Size of the internal BLOB key in bytes. */
        public static final int SIZE = 20;
    
        /** The byte buffer storing the actual key data. */
        private final byte[] key;
    
        /**
         * (Internal) BLOB type - to be reflected by the inheriting sub-class.
         */
        private final BlobType type;
    
        /**
         * BLOB type, i.e. permanent or transient.
         */
        enum BlobType {
            /**
             * Indicates a permanent BLOB whose lifecycle is that of a job and which is made highly
             * available.
             */
            PERMANENT_BLOB,
            /**
             * Indicates a transient BLOB whose lifecycle is managed by the user and which is not made
             * highly available.
             */
            TRANSIENT_BLOB
        }
    
        /**
         * Random component of the key.
         */
        private final AbstractID random;
    
        /**
         * Constructs a new BLOB key.
         *
         * @param type
         *      whether the referenced BLOB is permanent or transient
         */
        protected BlobKey(BlobType type) {
            this.type = checkNotNull(type);
            this.key = new byte[SIZE];
            this.random = new AbstractID();
        }
    
        /**
         * Constructs a new BLOB key from the given byte array.
         *
         * @param type
         *      whether the referenced BLOB is permanent or transient
         * @param key
         *        the actual key data
         */
        protected BlobKey(BlobType type, byte[] key) {
            if (key == null || key.length != SIZE) {
                throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
            }
    
            this.type = checkNotNull(type);
            this.key = key;
            this.random = new AbstractID();
        }
    
        /**
         * Constructs a new BLOB key from the given byte array.
         *
         * @param type
         *      whether the referenced BLOB is permanent or transient
         * @param key
         *        the actual key data
         * @param random
         *        the random component of the key
         */
        protected BlobKey(BlobType type, byte[] key, byte[] random) {
            if (key == null || key.length != SIZE) {
                throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
            }
    
            this.type = checkNotNull(type);
            this.key = key;
            this.random = new AbstractID(random);
        }
    
        /**
         * Returns the right {@link BlobKey} subclass for the given parameters.
         *
         * @param type
         *      whether the referenced BLOB is permanent or transient
         *
         * @return BlobKey subclass
         */
        @VisibleForTesting
        static BlobKey createKey(BlobType type) {
            if (type == PERMANENT_BLOB) {
                return new PermanentBlobKey();
            } else {
                return new TransientBlobKey();
            }
        }
    
        /**
         * Returns the right {@link BlobKey} subclass for the given parameters.
         *
         * @param type
         *      whether the referenced BLOB is permanent or transient
         * @param key
         *        the actual key data
         *
         * @return BlobKey subclass
         */
        static BlobKey createKey(BlobType type, byte[] key) {
            if (type == PERMANENT_BLOB) {
                return new PermanentBlobKey(key);
            } else {
                return new TransientBlobKey(key);
            }
        }
    
        /**
         * Returns the right {@link BlobKey} subclass for the given parameters.
         *
         * @param type
         *      whether the referenced BLOB is permanent or transient
         * @param key
         *        the actual key data
         * @param random
         *        the random component of the key
         *
         * @return BlobKey subclass
         */
        static BlobKey createKey(BlobType type, byte[] key, byte[] random) {
            if (type == PERMANENT_BLOB) {
                return new PermanentBlobKey(key, random);
            } else {
                return new TransientBlobKey(key, random);
            }
        }
    
        /**
         * Returns the hash component of this key.
         *
         * @return a 20 bit hash of the contents the key refers to
         */
        @VisibleForTesting
        public byte[] getHash() {
            return key;
        }
    
        /**
         * Returns the (internal) BLOB type which is reflected by the inheriting sub-class.
         *
         * @return BLOB type, i.e. permanent or transient
         */
        BlobType getType() {
            return type;
        }
    
        /**
         * Adds the BLOB key to the given {@link MessageDigest}.
         *
         * @param md
         *        the message digest to add the BLOB key to
         */
        public void addToMessageDigest(MessageDigest md) {
            md.update(this.key);
        }
    
        @Override
        public boolean equals(final Object obj) {
    
            if (!(obj instanceof BlobKey)) {
                return false;
            }
    
            final BlobKey bk = (BlobKey) obj;
    
            return Arrays.equals(this.key, bk.key) &&
                this.type == bk.type &&
                this.random.equals(bk.random);
        }
    
        @Override
        public int hashCode() {
            int result = Arrays.hashCode(this.key);
            result = 37 * result + this.type.hashCode();
            result = 37 * result + this.random.hashCode();
            return result;
        }
    
        @Override
        public String toString() {
            final String typeString;
            switch (this.type) {
                case TRANSIENT_BLOB:
                    typeString = "t-";
                    break;
                case PERMANENT_BLOB:
                    typeString = "p-";
                    break;
                default:
                    // this actually never happens!
                    throw new IllegalStateException("Invalid BLOB type");
            }
            return typeString + StringUtils.byteToHexString(this.key) + "-" + random.toString();
        }
    
        @Override
        public int compareTo(BlobKey o) {
            // compare the hashes first
            final byte[] aarr = this.key;
            final byte[] barr = o.key;
            final int len = Math.min(aarr.length, barr.length);
    
            for (int i = 0; i < len; ++i) {
                final int a = (aarr[i] & 0xff);
                final int b = (barr[i] & 0xff);
                if (a != b) {
                    return a - b;
                }
            }
    
            if (aarr.length == barr.length) {
                // same hash contents - compare the BLOB types
                int typeCompare = this.type.compareTo(o.type);
                if (typeCompare == 0) {
                    // same type - compare random components
                    return this.random.compareTo(o.random);
                } else {
                    return typeCompare;
                }
            } else {
                return aarr.length - barr.length;
            }
        }
    
        // --------------------------------------------------------------------------------------------
    
        /**
         * Auxiliary method to read a BLOB key from an input stream.
         *
         * @param inputStream
         *        the input stream to read the BLOB key from
         * @return the read BLOB key
         * @throws IOException
         *         throw if an I/O error occurs while reading from the input stream
         */
        static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
    
            final byte[] key = new byte[BlobKey.SIZE];
            final byte[] random = new byte[AbstractID.SIZE];
    
            int bytesRead = 0;
            // read key
            while (bytesRead < key.length) {
                final int read = inputStream.read(key, bytesRead, key.length - bytesRead);
                if (read < 0) {
                    throw new EOFException("Read an incomplete BLOB key");
                }
                bytesRead += read;
            }
    
            // read BLOB type
            final BlobType blobType;
            {
                final int read = inputStream.read();
                if (read < 0) {
                    throw new EOFException("Read an incomplete BLOB type");
                } else if (read == TRANSIENT_BLOB.ordinal()) {
                    blobType = TRANSIENT_BLOB;
                } else if (read == PERMANENT_BLOB.ordinal()) {
                    blobType = PERMANENT_BLOB;
                } else {
                    throw new IOException("Invalid data received for the BLOB type: " + read);
                }
            }
    
            // read random component
            bytesRead = 0;
            while (bytesRead < AbstractID.SIZE) {
                final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead);
                if (read < 0) {
                    throw new EOFException("Read an incomplete BLOB key");
                }
                bytesRead += read;
            }
    
            return createKey(blobType, key, random);
        }
    
        /**
         * Auxiliary method to write this BLOB key to an output stream.
         *
         * @param outputStream
         *        the output stream to write the BLOB key to
         * @throws IOException
         *         thrown if an I/O error occurs while writing the BLOB key
         */
        void writeToOutputStream(final OutputStream outputStream) throws IOException {
            outputStream.write(this.key);
            outputStream.write(this.type.ordinal());
            outputStream.write(this.random.getBytes());
        }
    }
    
    • BlobKey是个抽象类,它有key、BlobType、AbstractID三个属性,其中BlobType分为PERMANENT_BLOB及TRANSIENT_BLOB;它定义了createKey静态方法,用于根据BlobType创建BlobKey;readFromInputStream方法用于从InputStream反序列化为BlobKey;writeToOutputStream方法用于将BlobKey序列化到OutputStream;它有两个子类,分别为PermanentBlobKey及TransientBlobKey

    PermanentBlobKey

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java

    /**
     * BLOB key referencing permanent BLOB files.
     */
    public final class PermanentBlobKey extends BlobKey {
    
        /**
         * Constructs a new BLOB key.
         */
        @VisibleForTesting
        public PermanentBlobKey() {
            super(BlobType.PERMANENT_BLOB);
        }
    
        /**
         * Constructs a new BLOB key from the given byte array.
         *
         * @param key
         *        the actual key data
         */
        PermanentBlobKey(byte[] key) {
            super(BlobType.PERMANENT_BLOB, key);
        }
    
        /**
         * Constructs a new BLOB key from the given byte array.
         *
         * @param key
         *        the actual key data
         * @param random
         *        the random component of the key
         */
        PermanentBlobKey(byte[] key, byte[] random) {
            super(BlobType.PERMANENT_BLOB, key, random);
        }
    }
    
    • PermanentBlobKey继承了BlobKey,它的BlobType为BlobType.PERMANENT_BLOB

    TransientBlobKey

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java

    /**
     * BLOB key referencing transient BLOB files.
     */
    public final class TransientBlobKey extends BlobKey {
    
        /**
         * Constructs a new BLOB key.
         */
        @VisibleForTesting
        public TransientBlobKey() {
            super(BlobType.TRANSIENT_BLOB);
        }
    
        /**
         * Constructs a new BLOB key from the given byte array.
         *
         * @param key
         *        the actual key data
         */
        TransientBlobKey(byte[] key) {
            super(BlobType.TRANSIENT_BLOB, key);
        }
    
        /**
         * Constructs a new BLOB key from the given byte array.
         *
         * @param key
         *        the actual key data
         * @param random
         *        the random component of the key
         */
        TransientBlobKey(byte[] key, byte[] random) {
            super(BlobType.TRANSIENT_BLOB, key, random);
        }
    }
    
    • TransientBlobKey继承了BlobKey,它的BlobType为BlobType.TRANSIENT_BLOB

    AbstractID

    flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/util/AbstractID.java

    /**
     * A statistically unique identification number.
     */
    @PublicEvolving
    public class AbstractID implements Comparable<AbstractID>, java.io.Serializable {
    
        private static final long serialVersionUID = 1L;
    
        private static final Random RND = new Random();
    
        /** The size of a long in bytes. */
        private static final int SIZE_OF_LONG = 8;
    
        /** The size of the ID in byte. */
        public static final int SIZE = 2 * SIZE_OF_LONG;
    
        // ------------------------------------------------------------------------
    
        /** The upper part of the actual ID. */
        protected final long upperPart;
    
        /** The lower part of the actual ID. */
        protected final long lowerPart;
    
        /** The memoized value returned by toString(). */
        private transient String toString;
    
        // --------------------------------------------------------------------------------------------
    
        /**
         * Constructs a new ID with a specific bytes value.
         */
        public AbstractID(byte[] bytes) {
            if (bytes == null || bytes.length != SIZE) {
                throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes");
            }
    
            this.lowerPart = byteArrayToLong(bytes, 0);
            this.upperPart = byteArrayToLong(bytes, SIZE_OF_LONG);
        }
    
        /**
         * Constructs a new abstract ID.
         *
         * @param lowerPart the lower bytes of the ID
         * @param upperPart the higher bytes of the ID
         */
        public AbstractID(long lowerPart, long upperPart) {
            this.lowerPart = lowerPart;
            this.upperPart = upperPart;
        }
    
        /**
         * Copy constructor: Creates a new abstract ID from the given one.
         *
         * @param id the abstract ID to copy
         */
        public AbstractID(AbstractID id) {
            if (id == null) {
                throw new IllegalArgumentException("Id must not be null.");
            }
            this.lowerPart = id.lowerPart;
            this.upperPart = id.upperPart;
        }
    
        /**
         * Constructs a new random ID from a uniform distribution.
         */
        public AbstractID() {
            this.lowerPart = RND.nextLong();
            this.upperPart = RND.nextLong();
        }
    
        // --------------------------------------------------------------------------------------------
    
        /**
         * Gets the lower 64 bits of the ID.
         *
         * @return The lower 64 bits of the ID.
         */
        public long getLowerPart() {
            return lowerPart;
        }
    
        /**
         * Gets the upper 64 bits of the ID.
         *
         * @return The upper 64 bits of the ID.
         */
        public long getUpperPart() {
            return upperPart;
        }
    
        /**
         * Gets the bytes underlying this ID.
         *
         * @return The bytes underlying this ID.
         */
        public byte[] getBytes() {
            byte[] bytes = new byte[SIZE];
            longToByteArray(lowerPart, bytes, 0);
            longToByteArray(upperPart, bytes, SIZE_OF_LONG);
            return bytes;
        }
    
        // --------------------------------------------------------------------------------------------
        //  Standard Utilities
        // --------------------------------------------------------------------------------------------
    
        @Override
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            } else if (obj != null && obj.getClass() == getClass()) {
                AbstractID that = (AbstractID) obj;
                return that.lowerPart == this.lowerPart && that.upperPart == this.upperPart;
            } else {
                return false;
            }
        }
    
        @Override
        public int hashCode() {
            return ((int)  this.lowerPart) ^
                    ((int) (this.lowerPart >>> 32)) ^
                    ((int)  this.upperPart) ^
                    ((int) (this.upperPart >>> 32));
        }
    
        @Override
        public String toString() {
            if (this.toString == null) {
                final byte[] ba = new byte[SIZE];
                longToByteArray(this.lowerPart, ba, 0);
                longToByteArray(this.upperPart, ba, SIZE_OF_LONG);
    
                this.toString = StringUtils.byteToHexString(ba);
            }
    
            return this.toString;
        }
    
        @Override
        public int compareTo(AbstractID o) {
            int diff1 = Long.compare(this.upperPart, o.upperPart);
            int diff2 = Long.compare(this.lowerPart, o.lowerPart);
            return diff1 == 0 ? diff2 : diff1;
        }
    
        // --------------------------------------------------------------------------------------------
        //  Conversion Utilities
        // --------------------------------------------------------------------------------------------
    
        /**
         * Converts the given byte array to a long.
         *
         * @param ba the byte array to be converted
         * @param offset the offset indicating at which byte inside the array the conversion shall begin
         * @return the long variable
         */
        private static long byteArrayToLong(byte[] ba, int offset) {
            long l = 0;
    
            for (int i = 0; i < SIZE_OF_LONG; ++i) {
                l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
            }
    
            return l;
        }
    
        /**
         * Converts a long to a byte array.
         *
         * @param l the long variable to be converted
         * @param ba the byte array to store the result the of the conversion
         * @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored
         */
        private static void longToByteArray(long l, byte[] ba, int offset) {
            for (int i = 0; i < SIZE_OF_LONG; ++i) {
                final int shift = i << 3; // i * 8
                ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift);
            }
        }
    }
    
    • AbstractID由upperPart及lowerPart两个long类型的属性组成;无参构造器会使用Random.nextLong来生成upperPart及lowerPart;bytes参数的构造器则会从bytes中解析出lowerPart及upperPart;也可以直接使用lowerPart及upperPart参数的构造器直接指定

    小结

    • BlobService定义了getPermanentBlobService方法用于获取PermanentBlobService;getTransientBlobService方法用于获取TransientBlobService;PermanentBlobService提供了getFile方法,它根据JobID及PermanentBlobKey来获取File;TransientBlobService用于获取transient binary large objects (BLOBs),这些blobs在获取时就会在BlobServer上删掉;它提供了getFile、putTransient、deleteFromCache方法
    • BlobKey是个抽象类,它有key、BlobType、AbstractID三个属性,其中BlobType分为PERMANENT_BLOB及TRANSIENT_BLOB;它定义了createKey静态方法,用于根据BlobType创建BlobKey;readFromInputStream方法用于从InputStream反序列化为BlobKey;writeToOutputStream方法用于将BlobKey序列化到OutputStream;它有两个子类,分别为PermanentBlobKey及TransientBlobKey;PermanentBlobKey继承了BlobKey,它的BlobType为BlobType.PERMANENT_BLOB;TransientBlobKey继承了BlobKey,它的BlobType为BlobType.TRANSIENT_BLOB
    • AbstractID由upperPart及lowerPart两个long类型的属性组成;无参构造器会使用Random.nextLong来生成upperPart及lowerPart;bytes参数的构造器则会从bytes中解析出lowerPart及upperPart;也可以直接使用lowerPart及upperPart参数的构造器直接指定

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的BlobService

          本文链接:https://www.haomeiwen.com/subject/jzlxuqtx.html