序
本文主要研究一下flink的FileSystem
FileSystem
flink-1.7.2/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@Public
public abstract class FileSystem {
/**
* The possible write modes. The write mode decides what happens if a file should be created,
* but already exists.
*/
public enum WriteMode {
/** Creates the target file only if no file exists at that path already.
* Does not overwrite existing files and directories. */
NO_OVERWRITE,
/** Creates a new target file regardless of any existing files or directories.
* Existing files and directories will be deleted (recursively) automatically before
* creating the new file. */
OVERWRITE
}
// ------------------------------------------------------------------------
/** Logger for all FileSystem work. */
private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
* {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races. */
private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
/** Object used to protect calls to specific methods.*/
private static final ReentrantLock LOCK = new ReentrantLock(true);
/** Cache for file systems, by scheme + authority. */
private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>();
/** All available file system factories. */
private static final List<FileSystemFactory> RAW_FACTORIES = loadFileSystems();
/** Mapping of file system schemes to the corresponding factories,
* populated in {@link FileSystem#initialize(Configuration)}. */
private static final HashMap<String, FileSystemFactory> FS_FACTORIES = new HashMap<>();
/** The default factory that is used when no scheme matches. */
private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory();
/** The default filesystem scheme to be used, configured during process-wide initialization.
* This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
private static URI defaultScheme;
//......
// ------------------------------------------------------------------------
// Initialization
// ------------------------------------------------------------------------
/**
* Initializes the shared file system settings.
*
* <p>The given configuration is passed to each file system factory to initialize the respective
* file systems. Because the configuration of file systems may be different subsequent to the call
* of this method, this method clears the file system instance cache.
*
* <p>This method also reads the default file system URI from the configuration key
* {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where
* the URI has no scheme will be interpreted as relative to that URI.
* As an example, assume the default file system URI is set to {@code 'hdfs://localhost:9000/'}.
* A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
* {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}.
*
* @param config the configuration from where to fetch the parameter.
*/
public static void initialize(Configuration config) throws IOException, IllegalConfigurationException {
LOCK.lock();
try {
// make sure file systems are re-instantiated after re-configuration
CACHE.clear();
FS_FACTORIES.clear();
// configure all file system factories
for (FileSystemFactory factory : RAW_FACTORIES) {
factory.configure(config);
String scheme = factory.getScheme();
FileSystemFactory fsf = ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config);
FS_FACTORIES.put(scheme, fsf);
}
// configure the default (fallback) factory
FALLBACK_FACTORY.configure(config);
// also read the default file system scheme
final String stringifiedUri = config.getString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, null);
if (stringifiedUri == null) {
defaultScheme = null;
}
else {
try {
defaultScheme = new URI(stringifiedUri);
}
catch (URISyntaxException e) {
throw new IllegalConfigurationException("The default file system scheme ('" +
CoreOptions.DEFAULT_FILESYSTEM_SCHEME + "') is invalid: " + stringifiedUri, e);
}
}
}
finally {
LOCK.unlock();
}
}
// ------------------------------------------------------------------------
// Obtaining File System Instances
// ------------------------------------------------------------------------
public static FileSystem getLocalFileSystem() {
return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(LocalFileSystem.getSharedInstance());
}
public static FileSystem get(URI uri) throws IOException {
return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
}
@Internal
public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException {
checkNotNull(fsUri, "file system URI");
LOCK.lock();
try {
final URI uri;
if (fsUri.getScheme() != null) {
uri = fsUri;
}
else {
// Apply the default fs scheme
final URI defaultUri = getDefaultFsUri();
URI rewrittenUri = null;
try {
rewrittenUri = new URI(defaultUri.getScheme(), null, defaultUri.getHost(),
defaultUri.getPort(), fsUri.getPath(), null, null);
}
catch (URISyntaxException e) {
// for local URIs, we make one more try to repair the path by making it absolute
if (defaultUri.getScheme().equals("file")) {
try {
rewrittenUri = new URI(
"file", null,
new Path(new File(fsUri.getPath()).getAbsolutePath()).toUri().getPath(),
null);
} catch (URISyntaxException ignored) {
// could not help it...
}
}
}
if (rewrittenUri != null) {
uri = rewrittenUri;
}
else {
throw new IOException("The file system URI '" + fsUri +
"' declares no scheme and cannot be interpreted relative to the default file system URI ("
+ defaultUri + ").");
}
}
// print a helpful pointer for malformed local URIs (happens a lot to new users)
if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
String supposedUri = "file:///" + uri.getAuthority() + uri.getPath();
throw new IOException("Found local file path with authority '" + uri.getAuthority() + "' in path '"
+ uri.toString() + "'. Hint: Did you forget a slash? (correct path would be '" + supposedUri + "')");
}
final FSKey key = new FSKey(uri.getScheme(), uri.getAuthority());
// See if there is a file system object in the cache
{
FileSystem cached = CACHE.get(key);
if (cached != null) {
return cached;
}
}
// this "default" initialization makes sure that the FileSystem class works
// even when not configured with an explicit Flink configuration, like on
// JobManager or TaskManager setup
if (FS_FACTORIES.isEmpty()) {
initialize(new Configuration());
}
// Try to create a new file system
final FileSystem fs;
final FileSystemFactory factory = FS_FACTORIES.get(uri.getScheme());
if (factory != null) {
fs = factory.create(uri);
}
else {
try {
fs = FALLBACK_FACTORY.create(uri);
}
catch (UnsupportedFileSystemSchemeException e) {
throw new UnsupportedFileSystemSchemeException(
"Could not find a file system implementation for scheme '" + uri.getScheme() +
"'. The scheme is not directly supported by Flink and no Hadoop file " +
"system to support this scheme could be loaded.", e);
}
}
CACHE.put(key, fs);
return fs;
}
finally {
LOCK.unlock();
}
}
public static URI getDefaultFsUri() {
return defaultScheme != null ? defaultScheme : LocalFileSystem.getLocalFsURI();
}
// ------------------------------------------------------------------------
// File System Methods
// ------------------------------------------------------------------------
public abstract Path getWorkingDirectory();
public abstract Path getHomeDirectory();
public abstract URI getUri();
public abstract FileStatus getFileStatus(Path f) throws IOException;
public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException;
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;
public abstract FSDataInputStream open(Path f) throws IOException;
public RecoverableWriter createRecoverableWriter() throws IOException {
throw new UnsupportedOperationException("This file system does not support recoverable writers.");
}
public abstract FileStatus[] listStatus(Path f) throws IOException;
public boolean exists(final Path f) throws IOException {
try {
return (getFileStatus(f) != null);
} catch (FileNotFoundException e) {
return false;
}
}
public abstract boolean delete(Path f, boolean recursive) throws IOException;
public abstract boolean mkdirs(Path f) throws IOException;
public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException;
public abstract boolean rename(Path src, Path dst) throws IOException;
public abstract boolean isDistributedFS();
public abstract FileSystemKind getKind();
// ------------------------------------------------------------------------
// output directory initialization
// ------------------------------------------------------------------------
public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
if (isDistributedFS()) {
return false;
}
// NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
// concurrently work in this method (multiple output formats writing locally) might end
// up deleting each other's directories and leave non-retrievable files, without necessarily
// causing an exception. That results in very subtle issues, like output files looking as if
// they are not getting created.
// we acquire the lock interruptibly here, to make sure that concurrent threads waiting
// here can cancel faster
try {
OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
}
catch (InterruptedException e) {
// restore the interruption state
Thread.currentThread().interrupt();
// leave the method - we don't have the lock anyways
throw new IOException("The thread was interrupted while trying to initialize the output directory");
}
try {
FileStatus status;
try {
status = getFileStatus(outPath);
}
catch (FileNotFoundException e) {
// okay, the file is not there
status = null;
}
// check if path exists
if (status != null) {
// path exists, check write mode
switch (writeMode) {
case NO_OVERWRITE:
if (status.isDir() && createDirectory) {
return true;
} else {
// file may not be overwritten
throw new IOException("File or directory " + outPath + " already exists. Existing files and directories " +
"are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " +
WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
}
case OVERWRITE:
if (status.isDir()) {
if (createDirectory) {
// directory exists and does not need to be created
return true;
} else {
// we will write in a single file, delete directory
try {
delete(outPath, true);
}
catch (IOException e) {
throw new IOException("Could not remove existing directory '" + outPath +
"' to allow overwrite by result file", e);
}
}
}
else {
// delete file
try {
delete(outPath, false);
}
catch (IOException e) {
throw new IOException("Could not remove existing file '" + outPath +
"' to allow overwrite by result file/directory", e);
}
}
break;
default:
throw new IllegalArgumentException("Invalid write mode: " + writeMode);
}
}
if (createDirectory) {
// Output directory needs to be created
if (!exists(outPath)) {
mkdirs(outPath);
}
// double check that the output directory exists
try {
return getFileStatus(outPath).isDir();
}
catch (FileNotFoundException e) {
return false;
}
}
else {
// check that the output path does not exist and an output file
// can be created by the output format.
return !exists(outPath);
}
}
finally {
OUTPUT_DIRECTORY_INIT_LOCK.unlock();
}
}
public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
if (!isDistributedFS()) {
return false;
}
// NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
// concurrently work in this method (multiple output formats writing locally) might end
// up deleting each other's directories and leave non-retrievable files, without necessarily
// causing an exception. That results in very subtle issues, like output files looking as if
// they are not getting created.
// we acquire the lock interruptibly here, to make sure that concurrent threads waiting
// here can cancel faster
try {
OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
}
catch (InterruptedException e) {
// restore the interruption state
Thread.currentThread().interrupt();
// leave the method - we don't have the lock anyways
throw new IOException("The thread was interrupted while trying to initialize the output directory");
}
try {
// check if path exists
if (exists(outPath)) {
// path exists, check write mode
switch(writeMode) {
case NO_OVERWRITE:
// file or directory may not be overwritten
throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
" mode to overwrite existing files and directories.");
case OVERWRITE:
// output path exists. We delete it and all contained files in case of a directory.
try {
delete(outPath, true);
} catch (IOException e) {
// Some other thread might already have deleted the path.
// If - for some other reason - the path could not be deleted,
// this will be handled later.
}
break;
default:
throw new IllegalArgumentException("Invalid write mode: " + writeMode);
}
}
if (createDirectory) {
// Output directory needs to be created
try {
if (!exists(outPath)) {
mkdirs(outPath);
}
} catch (IOException ioe) {
// Some other thread might already have created the directory.
// If - for some other reason - the directory could not be created
// and the path does not exist, this will be handled later.
}
// double check that the output directory exists
return exists(outPath) && getFileStatus(outPath).isDir();
}
else {
// single file case: check that the output path does not exist and
// an output file can be created by the output format.
return !exists(outPath);
}
}
finally {
OUTPUT_DIRECTORY_INIT_LOCK.unlock();
}
}
//......
}
- FileSystem是flink使用的文件系统的抽象基类,子类实现的可以是本地文件系统或者分布式文件系统
- FileSystem定义了getWorkingDirectory、getHomeDirectory、getUri、getFileStatus、getFileBlockLocations、open、listStatus、delete、mkdirs、create、rename、isDistributedFS、getKind这几个抽象方法要求子类实现
- FileSystem提供了initOutPathLocalFS、initOutPathDistFS这几个已经实现的实例方法以及initialize、getLocalFileSystem、get、getUnguardedFileSystem、getDefaultFsUri这几个静态方法
LocalFileSystem
flink-1.7.2/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@Internal
public class LocalFileSystem extends FileSystem {
private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);
/** The URI representing the local file system. */
private static final URI LOCAL_URI = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");
/** The shared instance of the local file system. */
private static final LocalFileSystem INSTANCE = new LocalFileSystem();
/** Path pointing to the current working directory.
* Because Paths are not immutable, we cannot cache the proper path here */
private final URI workingDir;
/** Path pointing to the current working directory.
* Because Paths are not immutable, we cannot cache the proper path here. */
private final URI homeDir;
/** The host name of this machine. */
private final String hostName;
/**
* Constructs a new <code>LocalFileSystem</code> object.
*/
public LocalFileSystem() {
this.workingDir = new File(System.getProperty("user.dir")).toURI();
this.homeDir = new File(System.getProperty("user.home")).toURI();
String tmp = "unknownHost";
try {
tmp = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.error("Could not resolve local host", e);
}
this.hostName = tmp;
}
// ------------------------------------------------------------------------
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
return new BlockLocation[] {
new LocalBlockLocation(hostName, file.getLen())
};
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
final File path = pathToFile(f);
if (path.exists()) {
return new LocalFileStatus(path, this);
}
else {
throw new FileNotFoundException("File " + f + " does not exist or the user running "
+ "Flink ('" + System.getProperty("user.name") + "') has insufficient permissions to access it.");
}
}
@Override
public URI getUri() {
return LOCAL_URI;
}
@Override
public Path getWorkingDirectory() {
return new Path(workingDir);
}
@Override
public Path getHomeDirectory() {
return new Path(homeDir);
}
@Override
public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
return open(f);
}
@Override
public FSDataInputStream open(final Path f) throws IOException {
final File file = pathToFile(f);
return new LocalDataInputStream(file);
}
@Override
public LocalRecoverableWriter createRecoverableWriter() throws IOException {
return new LocalRecoverableWriter(this);
}
@Override
public boolean exists(Path f) throws IOException {
final File path = pathToFile(f);
return path.exists();
}
@Override
public FileStatus[] listStatus(final Path f) throws IOException {
final File localf = pathToFile(f);
FileStatus[] results;
if (!localf.exists()) {
return null;
}
if (localf.isFile()) {
return new FileStatus[] { new LocalFileStatus(localf, this) };
}
final String[] names = localf.list();
if (names == null) {
return null;
}
results = new FileStatus[names.length];
for (int i = 0; i < names.length; i++) {
results[i] = getFileStatus(new Path(f, names[i]));
}
return results;
}
@Override
public boolean delete(final Path f, final boolean recursive) throws IOException {
final File file = pathToFile(f);
if (file.isFile()) {
return file.delete();
} else if ((!recursive) && file.isDirectory()) {
File[] containedFiles = file.listFiles();
if (containedFiles == null) {
throw new IOException("Directory " + file.toString() + " does not exist or an I/O error occurred");
} else if (containedFiles.length != 0) {
throw new IOException("Directory " + file.toString() + " is not empty");
}
}
return delete(file);
}
/**
* Deletes the given file or directory.
*
* @param f
* the file to be deleted
* @return <code>true</code> if all files were deleted successfully, <code>false</code> otherwise
* @throws IOException
* thrown if an error occurred while deleting the files/directories
*/
private boolean delete(final File f) throws IOException {
if (f.isDirectory()) {
final File[] files = f.listFiles();
if (files != null) {
for (File file : files) {
final boolean del = delete(file);
if (!del) {
return false;
}
}
}
} else {
return f.delete();
}
// Now directory is empty
return f.delete();
}
/**
* Recursively creates the directory specified by the provided path.
*
* @return <code>true</code>if the directories either already existed or have been created successfully,
* <code>false</code> otherwise
* @throws IOException
* thrown if an error occurred while creating the directory/directories
*/
@Override
public boolean mkdirs(final Path f) throws IOException {
checkNotNull(f, "path is null");
return mkdirsInternal(pathToFile(f));
}
private boolean mkdirsInternal(File file) throws IOException {
if (file.isDirectory()) {
return true;
}
else if (file.exists() && !file.isDirectory()) {
// Important: The 'exists()' check above must come before the 'isDirectory()' check to
// be safe when multiple parallel instances try to create the directory
// exists and is not a directory -> is a regular file
throw new FileAlreadyExistsException(file.getAbsolutePath());
}
else {
File parent = file.getParentFile();
return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory());
}
}
@Override
public FSDataOutputStream create(final Path filePath, final WriteMode overwrite) throws IOException {
checkNotNull(filePath, "filePath");
if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
throw new FileAlreadyExistsException("File already exists: " + filePath);
}
final Path parent = filePath.getParent();
if (parent != null && !mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent);
}
final File file = pathToFile(filePath);
return new LocalDataOutputStream(file);
}
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
final File srcFile = pathToFile(src);
final File dstFile = pathToFile(dst);
final File dstParent = dstFile.getParentFile();
// Files.move fails if the destination directory doesn't exist
//noinspection ResultOfMethodCallIgnored -- we don't care if the directory existed or was created
dstParent.mkdirs();
try {
Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
return true;
}
catch (NoSuchFileException | AccessDeniedException | DirectoryNotEmptyException | SecurityException ex) {
// catch the errors that are regular "move failed" exceptions and return false
return false;
}
}
@Override
public boolean isDistributedFS() {
return false;
}
@Override
public FileSystemKind getKind() {
return FileSystemKind.FILE_SYSTEM;
}
// ------------------------------------------------------------------------
/**
* Converts the given Path to a File for this file system.
*
* <p>If the path is not absolute, it is interpreted relative to this FileSystem's working directory.
*/
public File pathToFile(Path path) {
if (!path.isAbsolute()) {
path = new Path(getWorkingDirectory(), path);
}
return new File(path.toUri().getPath());
}
// ------------------------------------------------------------------------
/**
* Gets the URI that represents the local file system.
* That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other
* UNIX family platforms.
*
* @return The URI that represents the local file system.
*/
public static URI getLocalFsURI() {
return LOCAL_URI;
}
/**
* Gets the shared instance of this file system.
*
* @return The shared instance of this file system.
*/
public static LocalFileSystem getSharedInstance() {
return INSTANCE;
}
}
- LocalFileSystem继承了FileSystem,它使用的是本地文件系统来实现,其isDistributedFS方法返回的false;getKind方法返回的是FileSystemKind.FILE_SYSTEM
HadoopFileSystem
flink-1.7.2/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
public class HadoopFileSystem extends FileSystem {
/** The wrapped Hadoop File System. */
private final org.apache.hadoop.fs.FileSystem fs;
/* This field caches the file system kind. It is lazily set because the file system
* URL is lazily initialized. */
private FileSystemKind fsKind;
/**
* Wraps the given Hadoop File System object as a Flink File System object.
* The given Hadoop file system object is expected to be initialized already.
*
* @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
*/
public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
}
/**
* Gets the underlying Hadoop FileSystem.
* @return The underlying Hadoop FileSystem.
*/
public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
return this.fs;
}
// ------------------------------------------------------------------------
// file system methods
// ------------------------------------------------------------------------
@Override
public Path getWorkingDirectory() {
return new Path(this.fs.getWorkingDirectory().toUri());
}
public Path getHomeDirectory() {
return new Path(this.fs.getHomeDirectory().toUri());
}
@Override
public URI getUri() {
return fs.getUri();
}
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(toHadoopPath(f));
return new HadoopFileStatus(status);
}
@Override
public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
throws IOException {
if (!(file instanceof HadoopFileStatus)) {
throw new IOException("file is not an instance of DistributedFileStatus");
}
final HadoopFileStatus f = (HadoopFileStatus) file;
final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
start, len);
// Wrap up HDFS specific block location objects
final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
for (int i = 0; i < distBlkLocations.length; i++) {
distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
}
return distBlkLocations;
}
@Override
public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
final org.apache.hadoop.fs.Path path = toHadoopPath(f);
final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
return new HadoopDataInputStream(fdis);
}
@Override
public HadoopDataInputStream open(final Path f) throws IOException {
final org.apache.hadoop.fs.Path path = toHadoopPath(f);
final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
return new HadoopDataInputStream(fdis);
}
@Override
@SuppressWarnings("deprecation")
public HadoopDataOutputStream create(
final Path f,
final boolean overwrite,
final int bufferSize,
final short replication,
final long blockSize) throws IOException {
final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
toHadoopPath(f), overwrite, bufferSize, replication, blockSize);
return new HadoopDataOutputStream(fdos);
}
@Override
public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream =
this.fs.create(toHadoopPath(f), overwrite == WriteMode.OVERWRITE);
return new HadoopDataOutputStream(fsDataOutputStream);
}
@Override
public boolean delete(final Path f, final boolean recursive) throws IOException {
return this.fs.delete(toHadoopPath(f), recursive);
}
@Override
public boolean exists(Path f) throws IOException {
return this.fs.exists(toHadoopPath(f));
}
@Override
public FileStatus[] listStatus(final Path f) throws IOException {
final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(toHadoopPath(f));
final FileStatus[] files = new FileStatus[hadoopFiles.length];
// Convert types
for (int i = 0; i < files.length; i++) {
files[i] = new HadoopFileStatus(hadoopFiles[i]);
}
return files;
}
@Override
public boolean mkdirs(final Path f) throws IOException {
return this.fs.mkdirs(toHadoopPath(f));
}
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
return this.fs.rename(toHadoopPath(src), toHadoopPath(dst));
}
@SuppressWarnings("deprecation")
@Override
public long getDefaultBlockSize() {
return this.fs.getDefaultBlockSize();
}
@Override
public boolean isDistributedFS() {
return true;
}
@Override
public FileSystemKind getKind() {
if (fsKind == null) {
fsKind = getKindForScheme(this.fs.getUri().getScheme());
}
return fsKind;
}
@Override
public RecoverableWriter createRecoverableWriter() throws IOException {
// This writer is only supported on a subset of file systems, and on
// specific versions. We check these schemes and versions eagerly for better error
// messages in the constructor of the writer.
return new HadoopRecoverableWriter(fs);
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
public static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
return new org.apache.hadoop.fs.Path(path.toUri());
}
/**
* Gets the kind of the file system from its scheme.
*
* <p>Implementation note: Initially, especially within the Flink 1.3.x line
* (in order to not break backwards compatibility), we must only label file systems
* as 'inconsistent' or as 'not proper filesystems' if we are sure about it.
* Otherwise, we cause regression for example in the performance and cleanup handling
* of checkpoints.
* For that reason, we initially mark some filesystems as 'eventually consistent' or
* as 'object stores', and leave the others as 'consistent file systems'.
*/
static FileSystemKind getKindForScheme(String scheme) {
scheme = scheme.toLowerCase(Locale.US);
if (scheme.startsWith("s3") || scheme.startsWith("emr")) {
// the Amazon S3 storage
return FileSystemKind.OBJECT_STORE;
}
else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
// file servers instead of file systems
// they might actually be consistent, but we have no hard guarantees
// currently to rely on that
return FileSystemKind.OBJECT_STORE;
}
else {
// the remainder should include hdfs, kosmos, ceph, ...
// this also includes federated HDFS (viewfs).
return FileSystemKind.FILE_SYSTEM;
}
}
}
- HadoopFileSystem继承了FileSystem,它使用的是HDFS文件系统来实现,其isDistributedFS方法返回的true;getKind方法返回的是FileSystemKind.FILE_SYSTEM或者FileSystemKind.OBJECT_STORE;FlinkS3FileSystem及MapRFileSystem都继承至HadoopFileSystem
小结
- FileSystem是flink使用的文件系统的抽象基类,子类实现的可以是本地文件系统或者分布式文件系统;它定义了getWorkingDirectory、getHomeDirectory、getUri、getFileStatus、getFileBlockLocations、open、listStatus、delete、mkdirs、create、rename、isDistributedFS、getKind这几个抽象方法要求子类实现;提供了initOutPathLocalFS、initOutPathDistFS这几个已经实现的实例方法以及initialize、getLocalFileSystem、get、getUnguardedFileSystem、getDefaultFsUri这几个静态方法
- LocalFileSystem继承了FileSystem,它使用的是本地文件系统来实现,其isDistributedFS方法返回的false;getKind方法返回的是FileSystemKind.FILE_SYSTEM
- HadoopFileSystem继承了FileSystem,它使用的是HDFS文件系统来实现,其isDistributedFS方法返回的true;getKind方法返回的是FileSystemKind.FILE_SYSTEM或者FileSystemKind.OBJECT_STORE;FlinkS3FileSystem及MapRFileSystem都继承至HadoopFileSystem
网友评论