美文网首页
聊聊flink的FileSystem

聊聊flink的FileSystem

作者: go4it | 来源:发表于2019-03-02 10:44 被阅读17次

    本文主要研究一下flink的FileSystem

    FileSystem

    flink-1.7.2/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java

    @Public
    public abstract class FileSystem {
    
        /**
         * The possible write modes. The write mode decides what happens if a file should be created,
         * but already exists.
         */
        public enum WriteMode {
    
            /** Creates the target file only if no file exists at that path already.
             * Does not overwrite existing files and directories. */
            NO_OVERWRITE,
    
            /** Creates a new target file regardless of any existing files or directories.
             * Existing files and directories will be deleted (recursively) automatically before
             * creating the new file. */
            OVERWRITE
        }
    
        // ------------------------------------------------------------------------
    
        /** Logger for all FileSystem work. */
        private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
    
        /** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
         * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races. */
        private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
    
        /** Object used to protect calls to specific methods.*/
        private static final ReentrantLock LOCK = new ReentrantLock(true);
    
        /** Cache for file systems, by scheme + authority. */
        private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>();
    
        /** All available file system factories. */
        private static final List<FileSystemFactory> RAW_FACTORIES = loadFileSystems();
    
        /** Mapping of file system schemes to the corresponding factories,
         * populated in {@link FileSystem#initialize(Configuration)}. */
        private static final HashMap<String, FileSystemFactory> FS_FACTORIES = new HashMap<>();
    
        /** The default factory that is used when no scheme matches. */
        private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory();
    
        /** The default filesystem scheme to be used, configured during process-wide initialization.
         * This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
        private static URI defaultScheme;
    
        //......
    
        // ------------------------------------------------------------------------
        //  Initialization
        // ------------------------------------------------------------------------
    
        /**
         * Initializes the shared file system settings.
         *
         * <p>The given configuration is passed to each file system factory to initialize the respective
         * file systems. Because the configuration of file systems may be different subsequent to the call
         * of this method, this method clears the file system instance cache.
         *
         * <p>This method also reads the default file system URI from the configuration key
         * {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where
         * the URI has no scheme will be interpreted as relative to that URI.
         * As an example, assume the default file system URI is set to {@code 'hdfs://localhost:9000/'}.
         * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
         * {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}.
         *
         * @param config the configuration from where to fetch the parameter.
         */
        public static void initialize(Configuration config) throws IOException, IllegalConfigurationException {
            LOCK.lock();
            try {
                // make sure file systems are re-instantiated after re-configuration
                CACHE.clear();
                FS_FACTORIES.clear();
    
                // configure all file system factories
                for (FileSystemFactory factory : RAW_FACTORIES) {
                    factory.configure(config);
                    String scheme = factory.getScheme();
    
                    FileSystemFactory fsf = ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config);
                    FS_FACTORIES.put(scheme, fsf);
                }
    
                // configure the default (fallback) factory
                FALLBACK_FACTORY.configure(config);
    
                // also read the default file system scheme
                final String stringifiedUri = config.getString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, null);
                if (stringifiedUri == null) {
                    defaultScheme = null;
                }
                else {
                    try {
                        defaultScheme = new URI(stringifiedUri);
                    }
                    catch (URISyntaxException e) {
                        throw new IllegalConfigurationException("The default file system scheme ('" +
                                CoreOptions.DEFAULT_FILESYSTEM_SCHEME + "') is invalid: " + stringifiedUri, e);
                    }
                }
            }
            finally {
                LOCK.unlock();
            }
        }
    
        // ------------------------------------------------------------------------
        //  Obtaining File System Instances
        // ------------------------------------------------------------------------
    
        public static FileSystem getLocalFileSystem() {
            return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(LocalFileSystem.getSharedInstance());
        }
    
        public static FileSystem get(URI uri) throws IOException {
            return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
        }
    
        @Internal
        public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException {
            checkNotNull(fsUri, "file system URI");
    
            LOCK.lock();
            try {
                final URI uri;
    
                if (fsUri.getScheme() != null) {
                    uri = fsUri;
                }
                else {
                    // Apply the default fs scheme
                    final URI defaultUri = getDefaultFsUri();
                    URI rewrittenUri = null;
    
                    try {
                        rewrittenUri = new URI(defaultUri.getScheme(), null, defaultUri.getHost(),
                                defaultUri.getPort(), fsUri.getPath(), null, null);
                    }
                    catch (URISyntaxException e) {
                        // for local URIs, we make one more try to repair the path by making it absolute
                        if (defaultUri.getScheme().equals("file")) {
                            try {
                                rewrittenUri = new URI(
                                        "file", null,
                                        new Path(new File(fsUri.getPath()).getAbsolutePath()).toUri().getPath(),
                                        null);
                            } catch (URISyntaxException ignored) {
                                // could not help it...
                            }
                        }
                    }
    
                    if (rewrittenUri != null) {
                        uri = rewrittenUri;
                    }
                    else {
                        throw new IOException("The file system URI '" + fsUri +
                                "' declares no scheme and cannot be interpreted relative to the default file system URI ("
                                + defaultUri + ").");
                    }
                }
    
                // print a helpful pointer for malformed local URIs (happens a lot to new users)
                if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
                    String supposedUri = "file:///" + uri.getAuthority() + uri.getPath();
    
                    throw new IOException("Found local file path with authority '" + uri.getAuthority() + "' in path '"
                            + uri.toString() + "'. Hint: Did you forget a slash? (correct path would be '" + supposedUri + "')");
                }
    
                final FSKey key = new FSKey(uri.getScheme(), uri.getAuthority());
    
                // See if there is a file system object in the cache
                {
                    FileSystem cached = CACHE.get(key);
                    if (cached != null) {
                        return cached;
                    }
                }
    
                // this "default" initialization makes sure that the FileSystem class works
                // even when not configured with an explicit Flink configuration, like on
                // JobManager or TaskManager setup
                if (FS_FACTORIES.isEmpty()) {
                    initialize(new Configuration());
                }
    
                // Try to create a new file system
                final FileSystem fs;
                final FileSystemFactory factory = FS_FACTORIES.get(uri.getScheme());
    
                if (factory != null) {
                    fs = factory.create(uri);
                }
                else {
                    try {
                        fs = FALLBACK_FACTORY.create(uri);
                    }
                    catch (UnsupportedFileSystemSchemeException e) {
                        throw new UnsupportedFileSystemSchemeException(
                                "Could not find a file system implementation for scheme '" + uri.getScheme() +
                                        "'. The scheme is not directly supported by Flink and no Hadoop file " +
                                        "system to support this scheme could be loaded.", e);
                    }
                }
    
                CACHE.put(key, fs);
                return fs;
            }
            finally {
                LOCK.unlock();
            }
        }
    
        public static URI getDefaultFsUri() {
            return defaultScheme != null ? defaultScheme : LocalFileSystem.getLocalFsURI();
        }
    
        // ------------------------------------------------------------------------
        //  File System Methods
        // ------------------------------------------------------------------------
    
        public abstract Path getWorkingDirectory();
    
        public abstract Path getHomeDirectory();
    
        public abstract URI getUri();
    
        public abstract FileStatus getFileStatus(Path f) throws IOException;
    
        public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException;
    
        public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;
    
        public abstract FSDataInputStream open(Path f) throws IOException;
    
        public RecoverableWriter createRecoverableWriter() throws IOException {
            throw new UnsupportedOperationException("This file system does not support recoverable writers.");
        }
    
        public abstract FileStatus[] listStatus(Path f) throws IOException;
    
        public boolean exists(final Path f) throws IOException {
            try {
                return (getFileStatus(f) != null);
            } catch (FileNotFoundException e) {
                return false;
            }
        }
    
        public abstract boolean delete(Path f, boolean recursive) throws IOException;
    
        public abstract boolean mkdirs(Path f) throws IOException;
    
    
        public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException;
    
        public abstract boolean rename(Path src, Path dst) throws IOException;
    
        public abstract boolean isDistributedFS();
    
        public abstract FileSystemKind getKind();
    
        // ------------------------------------------------------------------------
        //  output directory initialization
        // ------------------------------------------------------------------------
    
        public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
            if (isDistributedFS()) {
                return false;
            }
    
            // NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
            // concurrently work in this method (multiple output formats writing locally) might end
            // up deleting each other's directories and leave non-retrievable files, without necessarily
            // causing an exception. That results in very subtle issues, like output files looking as if
            // they are not getting created.
    
            // we acquire the lock interruptibly here, to make sure that concurrent threads waiting
            // here can cancel faster
            try {
                OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
            }
            catch (InterruptedException e) {
                // restore the interruption state
                Thread.currentThread().interrupt();
    
                // leave the method - we don't have the lock anyways
                throw new IOException("The thread was interrupted while trying to initialize the output directory");
            }
    
            try {
                FileStatus status;
                try {
                    status = getFileStatus(outPath);
                }
                catch (FileNotFoundException e) {
                    // okay, the file is not there
                    status = null;
                }
    
                // check if path exists
                if (status != null) {
                    // path exists, check write mode
                    switch (writeMode) {
    
                    case NO_OVERWRITE:
                        if (status.isDir() && createDirectory) {
                            return true;
                        } else {
                            // file may not be overwritten
                            throw new IOException("File or directory " + outPath + " already exists. Existing files and directories " +
                                    "are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " +
                                    WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
                        }
    
                    case OVERWRITE:
                        if (status.isDir()) {
                            if (createDirectory) {
                                // directory exists and does not need to be created
                                return true;
                            } else {
                                // we will write in a single file, delete directory
                                try {
                                    delete(outPath, true);
                                }
                                catch (IOException e) {
                                    throw new IOException("Could not remove existing directory '" + outPath +
                                            "' to allow overwrite by result file", e);
                                }
                            }
                        }
                        else {
                            // delete file
                            try {
                                delete(outPath, false);
                            }
                            catch (IOException e) {
                                throw new IOException("Could not remove existing file '" + outPath +
                                        "' to allow overwrite by result file/directory", e);
                            }
                        }
                        break;
    
                    default:
                        throw new IllegalArgumentException("Invalid write mode: " + writeMode);
                    }
                }
    
                if (createDirectory) {
                    // Output directory needs to be created
                    if (!exists(outPath)) {
                        mkdirs(outPath);
                    }
    
                    // double check that the output directory exists
                    try {
                        return getFileStatus(outPath).isDir();
                    }
                    catch (FileNotFoundException e) {
                        return false;
                    }
                }
                else {
                    // check that the output path does not exist and an output file
                    // can be created by the output format.
                    return !exists(outPath);
                }
            }
            finally {
                OUTPUT_DIRECTORY_INIT_LOCK.unlock();
            }
        }
    
        public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
            if (!isDistributedFS()) {
                return false;
            }
    
            // NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
            // concurrently work in this method (multiple output formats writing locally) might end
            // up deleting each other's directories and leave non-retrievable files, without necessarily
            // causing an exception. That results in very subtle issues, like output files looking as if
            // they are not getting created.
    
            // we acquire the lock interruptibly here, to make sure that concurrent threads waiting
            // here can cancel faster
            try {
                OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
            }
            catch (InterruptedException e) {
                // restore the interruption state
                Thread.currentThread().interrupt();
    
                // leave the method - we don't have the lock anyways
                throw new IOException("The thread was interrupted while trying to initialize the output directory");
            }
    
            try {
                // check if path exists
                if (exists(outPath)) {
                    // path exists, check write mode
                    switch(writeMode) {
    
                    case NO_OVERWRITE:
                        // file or directory may not be overwritten
                        throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
                                WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
                                    " mode to overwrite existing files and directories.");
    
                    case OVERWRITE:
                        // output path exists. We delete it and all contained files in case of a directory.
                        try {
                            delete(outPath, true);
                        } catch (IOException e) {
                            // Some other thread might already have deleted the path.
                            // If - for some other reason - the path could not be deleted,
                            // this will be handled later.
                        }
                        break;
    
                    default:
                        throw new IllegalArgumentException("Invalid write mode: " + writeMode);
                    }
                }
    
                if (createDirectory) {
                    // Output directory needs to be created
                    try {
                        if (!exists(outPath)) {
                            mkdirs(outPath);
                        }
                    } catch (IOException ioe) {
                        // Some other thread might already have created the directory.
                        // If - for some other reason - the directory could not be created
                        // and the path does not exist, this will be handled later.
                    }
    
                    // double check that the output directory exists
                    return exists(outPath) && getFileStatus(outPath).isDir();
                }
                else {
                    // single file case: check that the output path does not exist and
                    // an output file can be created by the output format.
                    return !exists(outPath);
                }
            }
            finally {
                OUTPUT_DIRECTORY_INIT_LOCK.unlock();
            }
        }
    
        //......
    }
    
    • FileSystem是flink使用的文件系统的抽象基类,子类实现的可以是本地文件系统或者分布式文件系统
    • FileSystem定义了getWorkingDirectory、getHomeDirectory、getUri、getFileStatus、getFileBlockLocations、open、listStatus、delete、mkdirs、create、rename、isDistributedFS、getKind这几个抽象方法要求子类实现
    • FileSystem提供了initOutPathLocalFS、initOutPathDistFS这几个已经实现的实例方法以及initialize、getLocalFileSystem、get、getUnguardedFileSystem、getDefaultFsUri这几个静态方法

    LocalFileSystem

    flink-1.7.2/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java

    @Internal
    public class LocalFileSystem extends FileSystem {
    
        private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);
    
        /** The URI representing the local file system. */
        private static final URI LOCAL_URI = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");
    
        /** The shared instance of the local file system. */
        private static final LocalFileSystem INSTANCE = new LocalFileSystem();
    
        /** Path pointing to the current working directory.
         * Because Paths are not immutable, we cannot cache the proper path here */
        private final URI workingDir;
    
        /** Path pointing to the current working directory.
         * Because Paths are not immutable, we cannot cache the proper path here. */
        private final URI homeDir;
    
        /** The host name of this machine. */
        private final String hostName;
    
        /**
         * Constructs a new <code>LocalFileSystem</code> object.
         */
        public LocalFileSystem() {
            this.workingDir = new File(System.getProperty("user.dir")).toURI();
            this.homeDir = new File(System.getProperty("user.home")).toURI();
    
            String tmp = "unknownHost";
            try {
                tmp = InetAddress.getLocalHost().getHostName();
            } catch (UnknownHostException e) {
                LOG.error("Could not resolve local host", e);
            }
            this.hostName = tmp;
        }
    
        // ------------------------------------------------------------------------
    
        @Override
        public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
            return new BlockLocation[] {
                    new LocalBlockLocation(hostName, file.getLen())
            };
        }
    
        @Override
        public FileStatus getFileStatus(Path f) throws IOException {
            final File path = pathToFile(f);
            if (path.exists()) {
                return new LocalFileStatus(path, this);
            }
            else {
                throw new FileNotFoundException("File " + f + " does not exist or the user running "
                        + "Flink ('" + System.getProperty("user.name") + "') has insufficient permissions to access it.");
            }
        }
    
        @Override
        public URI getUri() {
            return LOCAL_URI;
        }
    
        @Override
        public Path getWorkingDirectory() {
            return new Path(workingDir);
        }
    
        @Override
        public Path getHomeDirectory() {
            return new Path(homeDir);
        }
    
        @Override
        public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
            return open(f);
        }
    
        @Override
        public FSDataInputStream open(final Path f) throws IOException {
            final File file = pathToFile(f);
            return new LocalDataInputStream(file);
        }
    
        @Override
        public LocalRecoverableWriter createRecoverableWriter() throws IOException {
            return new LocalRecoverableWriter(this);
        }
    
        @Override
        public boolean exists(Path f) throws IOException {
            final File path = pathToFile(f);
            return path.exists();
        }
    
        @Override
        public FileStatus[] listStatus(final Path f) throws IOException {
    
            final File localf = pathToFile(f);
            FileStatus[] results;
    
            if (!localf.exists()) {
                return null;
            }
            if (localf.isFile()) {
                return new FileStatus[] { new LocalFileStatus(localf, this) };
            }
    
            final String[] names = localf.list();
            if (names == null) {
                return null;
            }
            results = new FileStatus[names.length];
            for (int i = 0; i < names.length; i++) {
                results[i] = getFileStatus(new Path(f, names[i]));
            }
    
            return results;
        }
    
        @Override
        public boolean delete(final Path f, final boolean recursive) throws IOException {
    
            final File file = pathToFile(f);
            if (file.isFile()) {
                return file.delete();
            } else if ((!recursive) && file.isDirectory()) {
                File[] containedFiles = file.listFiles();
                if (containedFiles == null) {
                    throw new IOException("Directory " + file.toString() + " does not exist or an I/O error occurred");
                } else if (containedFiles.length != 0) {
                    throw new IOException("Directory " + file.toString() + " is not empty");
                }
            }
    
            return delete(file);
        }
    
        /**
         * Deletes the given file or directory.
         *
         * @param f
         *        the file to be deleted
         * @return <code>true</code> if all files were deleted successfully, <code>false</code> otherwise
         * @throws IOException
         *         thrown if an error occurred while deleting the files/directories
         */
        private boolean delete(final File f) throws IOException {
    
            if (f.isDirectory()) {
                final File[] files = f.listFiles();
                if (files != null) {
                    for (File file : files) {
                        final boolean del = delete(file);
                        if (!del) {
                            return false;
                        }
                    }
                }
            } else {
                return f.delete();
            }
    
            // Now directory is empty
            return f.delete();
        }
    
        /**
         * Recursively creates the directory specified by the provided path.
         *
         * @return <code>true</code>if the directories either already existed or have been created successfully,
         *         <code>false</code> otherwise
         * @throws IOException
         *         thrown if an error occurred while creating the directory/directories
         */
        @Override
        public boolean mkdirs(final Path f) throws IOException {
            checkNotNull(f, "path is null");
            return mkdirsInternal(pathToFile(f));
        }
    
        private boolean mkdirsInternal(File file) throws IOException {
            if (file.isDirectory()) {
                    return true;
            }
            else if (file.exists() && !file.isDirectory()) {
                // Important: The 'exists()' check above must come before the 'isDirectory()' check to
                //            be safe when multiple parallel instances try to create the directory
    
                // exists and is not a directory -> is a regular file
                throw new FileAlreadyExistsException(file.getAbsolutePath());
            }
            else {
                File parent = file.getParentFile();
                return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory());
            }
        }
    
        @Override
        public FSDataOutputStream create(final Path filePath, final WriteMode overwrite) throws IOException {
            checkNotNull(filePath, "filePath");
    
            if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
                throw new FileAlreadyExistsException("File already exists: " + filePath);
            }
    
            final Path parent = filePath.getParent();
            if (parent != null && !mkdirs(parent)) {
                throw new IOException("Mkdirs failed to create " + parent);
            }
    
            final File file = pathToFile(filePath);
            return new LocalDataOutputStream(file);
        }
    
        @Override
        public boolean rename(final Path src, final Path dst) throws IOException {
            final File srcFile = pathToFile(src);
            final File dstFile = pathToFile(dst);
    
            final File dstParent = dstFile.getParentFile();
    
            // Files.move fails if the destination directory doesn't exist
            //noinspection ResultOfMethodCallIgnored -- we don't care if the directory existed or was created
            dstParent.mkdirs();
    
            try {
                Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
                return true;
            }
            catch (NoSuchFileException | AccessDeniedException | DirectoryNotEmptyException | SecurityException ex) {
                // catch the errors that are regular "move failed" exceptions and return false
                return false;
            }
        }
    
        @Override
        public boolean isDistributedFS() {
            return false;
        }
    
        @Override
        public FileSystemKind getKind() {
            return FileSystemKind.FILE_SYSTEM;
        }
    
        // ------------------------------------------------------------------------
    
        /**
         * Converts the given Path to a File for this file system.
         *
         * <p>If the path is not absolute, it is interpreted relative to this FileSystem's working directory.
         */
        public File pathToFile(Path path) {
            if (!path.isAbsolute()) {
                path = new Path(getWorkingDirectory(), path);
            }
            return new File(path.toUri().getPath());
        }
    
        // ------------------------------------------------------------------------
    
        /**
         * Gets the URI that represents the local file system.
         * That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other
         * UNIX family platforms.
         *
         * @return The URI that represents the local file system.
         */
        public static URI getLocalFsURI() {
            return LOCAL_URI;
        }
    
        /**
         * Gets the shared instance of this file system.
         *
         * @return The shared instance of this file system.
         */
        public static LocalFileSystem getSharedInstance() {
            return INSTANCE;
        }
    }
    
    • LocalFileSystem继承了FileSystem,它使用的是本地文件系统来实现,其isDistributedFS方法返回的false;getKind方法返回的是FileSystemKind.FILE_SYSTEM

    HadoopFileSystem

    flink-1.7.2/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java

    public class HadoopFileSystem extends FileSystem {
    
        /** The wrapped Hadoop File System. */
        private final org.apache.hadoop.fs.FileSystem fs;
    
        /* This field caches the file system kind. It is lazily set because the file system
        * URL is lazily initialized. */
        private FileSystemKind fsKind;
    
    
        /**
         * Wraps the given Hadoop File System object as a Flink File System object.
         * The given Hadoop file system object is expected to be initialized already.
         *
         * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
         */
        public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
            this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
        }
    
        /**
         * Gets the underlying Hadoop FileSystem.
         * @return The underlying Hadoop FileSystem.
         */
        public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
            return this.fs;
        }
    
        // ------------------------------------------------------------------------
        //  file system methods
        // ------------------------------------------------------------------------
    
        @Override
        public Path getWorkingDirectory() {
            return new Path(this.fs.getWorkingDirectory().toUri());
        }
    
        public Path getHomeDirectory() {
            return new Path(this.fs.getHomeDirectory().toUri());
        }
    
        @Override
        public URI getUri() {
            return fs.getUri();
        }
    
        @Override
        public FileStatus getFileStatus(final Path f) throws IOException {
            org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(toHadoopPath(f));
            return new HadoopFileStatus(status);
        }
    
        @Override
        public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
                throws IOException {
            if (!(file instanceof HadoopFileStatus)) {
                throw new IOException("file is not an instance of DistributedFileStatus");
            }
    
            final HadoopFileStatus f = (HadoopFileStatus) file;
    
            final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
                start, len);
    
            // Wrap up HDFS specific block location objects
            final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
            for (int i = 0; i < distBlkLocations.length; i++) {
                distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
            }
    
            return distBlkLocations;
        }
    
        @Override
        public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
            final org.apache.hadoop.fs.Path path = toHadoopPath(f);
            final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
            return new HadoopDataInputStream(fdis);
        }
    
        @Override
        public HadoopDataInputStream open(final Path f) throws IOException {
            final org.apache.hadoop.fs.Path path = toHadoopPath(f);
            final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
            return new HadoopDataInputStream(fdis);
        }
    
        @Override
        @SuppressWarnings("deprecation")
        public HadoopDataOutputStream create(
                final Path f,
                final boolean overwrite,
                final int bufferSize,
                final short replication,
                final long blockSize) throws IOException {
    
            final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
                    toHadoopPath(f), overwrite, bufferSize, replication, blockSize);
            return new HadoopDataOutputStream(fdos);
        }
    
        @Override
        public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
            final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream =
                    this.fs.create(toHadoopPath(f), overwrite == WriteMode.OVERWRITE);
            return new HadoopDataOutputStream(fsDataOutputStream);
        }
    
        @Override
        public boolean delete(final Path f, final boolean recursive) throws IOException {
            return this.fs.delete(toHadoopPath(f), recursive);
        }
    
        @Override
        public boolean exists(Path f) throws IOException {
            return this.fs.exists(toHadoopPath(f));
        }
    
        @Override
        public FileStatus[] listStatus(final Path f) throws IOException {
            final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(toHadoopPath(f));
            final FileStatus[] files = new FileStatus[hadoopFiles.length];
    
            // Convert types
            for (int i = 0; i < files.length; i++) {
                files[i] = new HadoopFileStatus(hadoopFiles[i]);
            }
    
            return files;
        }
    
        @Override
        public boolean mkdirs(final Path f) throws IOException {
            return this.fs.mkdirs(toHadoopPath(f));
        }
    
        @Override
        public boolean rename(final Path src, final Path dst) throws IOException {
            return this.fs.rename(toHadoopPath(src), toHadoopPath(dst));
        }
    
        @SuppressWarnings("deprecation")
        @Override
        public long getDefaultBlockSize() {
            return this.fs.getDefaultBlockSize();
        }
    
        @Override
        public boolean isDistributedFS() {
            return true;
        }
    
        @Override
        public FileSystemKind getKind() {
            if (fsKind == null) {
                fsKind = getKindForScheme(this.fs.getUri().getScheme());
            }
            return fsKind;
        }
    
        @Override
        public RecoverableWriter createRecoverableWriter() throws IOException {
            // This writer is only supported on a subset of file systems, and on
            // specific versions. We check these schemes and versions eagerly for better error
            // messages in the constructor of the writer.
            return new HadoopRecoverableWriter(fs);
        }
    
        // ------------------------------------------------------------------------
        //  Utilities
        // ------------------------------------------------------------------------
    
        public static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
            return new org.apache.hadoop.fs.Path(path.toUri());
        }
    
        /**
         * Gets the kind of the file system from its scheme.
         *
         * <p>Implementation note: Initially, especially within the Flink 1.3.x line
         * (in order to not break backwards compatibility), we must only label file systems
         * as 'inconsistent' or as 'not proper filesystems' if we are sure about it.
         * Otherwise, we cause regression for example in the performance and cleanup handling
         * of checkpoints.
         * For that reason, we initially mark some filesystems as 'eventually consistent' or
         * as 'object stores', and leave the others as 'consistent file systems'.
         */
        static FileSystemKind getKindForScheme(String scheme) {
            scheme = scheme.toLowerCase(Locale.US);
    
            if (scheme.startsWith("s3") || scheme.startsWith("emr")) {
                // the Amazon S3 storage
                return FileSystemKind.OBJECT_STORE;
            }
            else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
                // file servers instead of file systems
                // they might actually be consistent, but we have no hard guarantees
                // currently to rely on that
                return FileSystemKind.OBJECT_STORE;
            }
            else {
                // the remainder should include hdfs, kosmos, ceph, ...
                // this also includes federated HDFS (viewfs).
                return FileSystemKind.FILE_SYSTEM;
            }
        }
    
    }
    
    • HadoopFileSystem继承了FileSystem,它使用的是HDFS文件系统来实现,其isDistributedFS方法返回的true;getKind方法返回的是FileSystemKind.FILE_SYSTEM或者FileSystemKind.OBJECT_STORE;FlinkS3FileSystem及MapRFileSystem都继承至HadoopFileSystem

    小结

    • FileSystem是flink使用的文件系统的抽象基类,子类实现的可以是本地文件系统或者分布式文件系统;它定义了getWorkingDirectory、getHomeDirectory、getUri、getFileStatus、getFileBlockLocations、open、listStatus、delete、mkdirs、create、rename、isDistributedFS、getKind这几个抽象方法要求子类实现;提供了initOutPathLocalFS、initOutPathDistFS这几个已经实现的实例方法以及initialize、getLocalFileSystem、get、getUnguardedFileSystem、getDefaultFsUri这几个静态方法
    • LocalFileSystem继承了FileSystem,它使用的是本地文件系统来实现,其isDistributedFS方法返回的false;getKind方法返回的是FileSystemKind.FILE_SYSTEM
    • HadoopFileSystem继承了FileSystem,它使用的是HDFS文件系统来实现,其isDistributedFS方法返回的true;getKind方法返回的是FileSystemKind.FILE_SYSTEM或者FileSystemKind.OBJECT_STORE;FlinkS3FileSystem及MapRFileSystem都继承至HadoopFileSystem

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的FileSystem

          本文链接:https://www.haomeiwen.com/subject/thheuqtx.html