美文网首页
HDFS中API的使用

HDFS中API的使用

作者: lizb | 来源:发表于2019-05-23 13:54 被阅读0次

    在项目开发中,有时我们需要通过HDFS的api来对文件进行操作,比如将数据上传到HDFS或者从HDFS获取数据等。本篇来介绍一下HDFS中API的具体使用。直接上代码:

    package com.lzb.hdfs.fs;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.*;
    
    import java.io.IOException;
    
    public class HDFSHelper {
    
        private FileSystem fs;
    
        public HDFSHelper() {
            fs = getFileSystem();
        }
    
        /**
         * Configuration是配置对象,conf可以理解为包含了所有配置信息的一个集合,可以认为是Map,
         * 在初始化的时候底层会加载一堆配置文件 core-site.xml;hdfs-site.xml;mapred-site.xml;yarn-site.xml
         * 如果需要项目代码自动加载配置文件中的信息,那么就必须把配置文件改成-default.xml或者-site.xml的名称,
         * 而且必须放置在src下,如果不叫这个名,或者不在src下,也需要加载这些配置文件中的参数,必须使用conf对象提供的方法手动加载.
         * 依次加载的参数信息的顺序是:
         * 1.加载core/hdfs/mapred/yarn-default.xml
         * 2.加载通过conf.addResource()加载的配置文件
         * 3.加载conf.set(name,value)
         */
        private Configuration getConfiguration(){
            Configuration conf = new Configuration();
            //conf.addResource("xxx");
            //conf.set("xxx","xxx");
            //Configuration.addDefaultResource("core-site.xml");
            //Configuration.addDefaultResource("hdfs-site.xml");
            //conf.set("fs.default.name","hdfs://probd01:8020");
    
            //HA模式的配置
            conf.set("fs.defaultFS", "hdfs://probd");
            conf.set("dfs.nameservices", "probd");
            conf.set("dfs.ha.namenodes.probd", "nn1,nn2");
            conf.set("dfs.namenode.rpc-address.probd.nn1", "probd01:8020");
            conf.set("dfs.namenode.rpc-address.probd.nn2", "probd02:8020");
            conf.set("dfs.client.failover.proxy.provider.probd", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
    
            //防止报错:no FileSystem for scheme: hdfs...
            conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
            return conf;
        }
    
        /**
         * 获取文件系统
         * 本地文件系统为LocalFileSystem,URL形式:    file:///c:myProgram
         * HDFS文件系统为DistributedFileSystem,URL形式:    fs.defaultFS=hdfs://hadoop01:9000
         */
        public FileSystem getFileSystem(){
            Configuration conf = getConfiguration();
            FileSystem fs = null;
            try {
                fs = FileSystem.get(conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println(fs);
            return fs;
        }
    
        /**
         * 上传本地文件到HDFS,底层就是采用流的方式
         * @param localPath 本地文件路径
         * @param remotePath HDFS文件路径
         * @return 是否上传成功
         */
        public boolean copyFromLocal(String localPath,String remotePath){
            if(fs == null) return false;
            try {
                fs.copyFromLocalFile(new Path(localPath),new Path(remotePath));
            } catch (IOException e) {
                e.printStackTrace();
            }
            return true;
        }
    
        /**
         * 从HDFS下载文件,底层就是采用流的方式
         * @param remotePath HDFS文件路径
         * @param localPath 本地路径
         * @return 是否下载成功
         */
        public boolean copyToLocal(String remotePath,String localPath){
            if(fs == null) return false;
            try {
                fs.copyToLocalFile(new Path(remotePath),new Path(localPath));
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
            return true;
        }
    
        /**
         * 获取目录下的文件
         * @param remotePath HDFS文件路径
         * @param recursive 是否级联(该文件夹下面如果还有子文件 要不要看,注意没有 子文件夹!!)
         */
        public void listFiles(String remotePath,boolean recursive){
            if(fs == null) return;
            try {
                RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(remotePath), recursive);
                while (iterator.hasNext()){
                    LocatedFileStatus fileStatus = iterator.next();
    
                    //文件的存储路径,以hdfs://开头的全路径 ==> hdfs://hadoop01:9000/a/gg.txt
                    System.out.println( "file path === " + fileStatus.getPath());
    
                    //文件名
                    System.out.println("file name === " + fileStatus.getPath().getName());
    
                    //文件长度
                    System.out.println("file size === "+fileStatus.getLen());
    
                    //文件所有者
                    System.out.println("file owner === "+fileStatus.getOwner());
    
                    //分组信息
                    System.out.println("file group === " + fileStatus.getGroup());
    
                    //文件权限信息
                    System.out.println("file permission === " + fileStatus.getPermission());
    
                    //文件副本数
                    System.out.println("file blocks === " + fileStatus.getReplication());
    
                    //块大小
                    System.out.println("file block size === " + fileStatus.getBlockSize());
    
                    //块位置相关信息
                    BlockLocation[] blockLocations = fileStatus.getBlockLocations();
    
                    //块的数量
                    System.out.println("file block nums === " + blockLocations.length);
    
                    for (BlockLocation bl : blockLocations) {
                        String[] hosts = bl.getHosts();
                        for (String host: hosts) {
                            System.out.println("block host === " + host);
                        }
                        //块的一个逻辑路径
                        bl.getTopologyPaths();
                    }
    
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
    
        /**
         * 获取目录下的文件
         * 此方法与listFiles不同,不支持传true或false,即不能级联,如果想实现级联就采用递归的方式
         * @param remotePath HDFS文件路径
         */
        public void listStatus(String remotePath){
            if(fs == null) return;
            try {
                FileStatus[] listStatus = fs.listStatus(new Path(remotePath));
                for (FileStatus fss : listStatus) {
                    //判断是不是文件夹
                    boolean directory = fss.isDirectory();
    
                    //判断是不是文件
                    boolean file = fss.isFile();
    
                    String name = fss.getPath().getName();
    
                    if(file) {
                        System.out.println(name+":文件");
                    }else {
                        System.out.println(name+":文件夹");
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    
        /**
         * 删除空文件夹或空文件
         * @param path
         */
        public void deleteEmptyDirAndFile(Path path){
            if(fs == null) return;
            try {
    
                FileStatus[] listStatus = fs.listStatus(path);
                if(listStatus.length == 0){
                    //删除空文件夹
                    fs.delete(path,true);
                    return;
                }
    
                RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(path);
    
                while (iterator.hasNext()) {
                    LocatedFileStatus next = iterator.next();
                    Path currentPath = next.getPath();
                    Path parentPath = next.getPath().getParent();
    
    
                    if (next.isDirectory()) {
                        // 如果是空文件夹
                        if (fs.listStatus(currentPath).length == 0) {
                            // 删除掉
                            fs.delete(currentPath, true);
                        } else {
                            // 不是空文件夹,那么则继续遍历
                            if (fs.exists(currentPath)) {
                                deleteEmptyDirAndFile(currentPath);
                            }
                        }
                    } else {
                        // 获取文件的长度
                        long fileLength = next.getLen();
                        // 当文件是空文件时, 删除
                        if (fileLength == 0) {
                            fs.delete(currentPath, true);
                        }
                    }
    
                    // 当空文件夹或者空文件删除时,有可能导致父文件夹为空文件夹,
                    // 所以每次删除一个空文件或者空文件的时候都需要判断一下,如果真是如此,那么就需要把该文件夹也删除掉
                    int length = fs.listStatus(parentPath).length;
                    if (length == 0) {
                        fs.delete(parentPath, true);
                    }
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
    
        /**
         * 创建文件夹
         * @param remotePath HDFS文件路径
         * @return 是否创建成功
         */
        public boolean mkdir(String remotePath){
            if(fs == null) return false;
            boolean success = false;
            try {
                success = fs.mkdirs(new Path(remotePath));
            } catch (IOException e) {
                e.printStackTrace();
            }
            return success;
        }
    
        /**
         * 写入文件
         * @param remotePath HDFS文件路径
         * @param content 内容
         * @return 是否写入成功
         */
        public boolean writeToFile(String remotePath,String content){
            if(fs == null) return false;
            try {
                FSDataOutputStream out = fs.create(new Path(remotePath));
                out.writeUTF(content);
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
            return true;
    
        }
    
        /**
         * 读取文件数据
         * @param remotePath HDFS文件路径
         * @return 读取的结果数据
         */
        public String readFromFile(String remotePath){
            String result = null;
            if(fs == null) return null;
            try {
    
                FSDataInputStream in = fs.open(new Path(remotePath));
                result = in.readUTF();
                in.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return result;
        }
    
    
        /**
         * 重命名文件
         * @param oldPath 旧文件路径
         * @param newPath 新文件路径
         * @return 是否重命名成功
         */
        public boolean renameFile(String oldPath,String newPath){
            if(fs == null) return false;
            Path old=new Path(oldPath);
            Path now=new Path(newPath);
            boolean rename = false;
            try {
                rename = fs.rename(old, now);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return rename;
        }
    
        /**
         * 删除目录和文件
         * @param remotePath HDFS文件路径
         * @return 是否删除成功
         */
        public boolean deleteFile(String remotePath){
            if(fs == null) return false;
            boolean success = false;
            try {
                success = fs.delete(new Path(remotePath), true);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return success;
        }
    
        /**
         * 检查文件是否存在
         * @param remotePath HDFS文件路径
         * @return 是否存在
         */
        public boolean existFile(String remotePath){
            if(fs == null) return false;
            boolean exist = false;
            try {
                exist = fs.exists(new Path(remotePath));
            } catch (IOException e) {
                e.printStackTrace();
            }
            return exist;
        }
    
    
        /**
         * 关闭FileSystem
         */
        public void closeFileSystem(){
            if(fs != null){
                try {
                    fs.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    
    

    上面代码都有注释,这里就不过多解释了,下面来看主类的执行代码:

    package com.lzb.hdfs;
    
    
    import com.lzb.hdfs.fs.HDFSHelper;
    
    public class Demo {
        public static void main(String[] args) {
    
            HDFSHelper hdfsHelper = new HDFSHelper();
    
            String dir = "/test";
            String filename = "hello.txt";
            String path = dir + "/" + filename;
    
            boolean exist = hdfsHelper.existFile(path);
            System.out.println(path + " exist file ==> " + exist);
    
            if(!exist){
                boolean mkdir = hdfsHelper.mkdir(dir);
                System.out.println(dir + " create success ==> " + mkdir);
    
                boolean copyFromLocal = hdfsHelper.copyFromLocal("/"+filename, dir);
                System.out.println("upload success ==> " + copyFromLocal);
    
                hdfsHelper.listFiles(dir,false);
    
                String content = "hello world new";
                boolean write = hdfsHelper.writeToFile(path, content);
                System.out.println("write success ==> " + write);
    
                String data = hdfsHelper.readFromFile(path);
                System.out.println("read the data ==> " + data);
    
                String newPath = dir + "/hello2.txt";
                boolean renameFile = hdfsHelper.renameFile(path, newPath);
                System.out.println("rename success ==> " + renameFile);
    
                boolean copyToLocal = hdfsHelper.copyToLocal(newPath, "/hello2.txt");
                System.out.println("download success ==> " + copyToLocal);
    
                //boolean deleteFile = hdfsHelper.deleteFile(newPath);
                //System.out.println("delete success ==> " + deleteFile);
    
            }
    
            hdfsHelper.closeFileSystem();
    
        }
    }
    
    

    执行结果如下:

    log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
    log4j:WARN Please initialize the log4j system properly.
    DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1866182384_1, ugi=root (auth:SIMPLE)]]
    /test/hello.txt exist file ==> false
    /test create success ==> true
    upload success ==> true
    file path === hdfs://probd/test/hello.txt
    file name === hello.txt
    file size === 12
    file owner === root
    file group === supergroup
    file permission === rw-r--r--
    file blocks === 3
    file block size === 134217728
    file block nums === 1
    block host === Probd01
    block host === Probd03
    block host === Probd02
    write success ==> true
    read the data ==> hello world new
    rename success ==> true
    download success ==> true
    

    相关文章

      网友评论

          本文标题:HDFS中API的使用

          本文链接:https://www.haomeiwen.com/subject/wariaqtx.html