美文网首页
HDFS的API简介

HDFS的API简介

作者: 水木清华_f221 | 来源:发表于2018-12-20 14:22 被阅读20次

    背景

    从事大数据以来,主要是spark的开发,时不时总是会用到一些hdfs的上接口;半途开始大数据,对hdfs不是很熟悉。每次用都是百度。所以这里做个总结,为了以后能更好的用好hdfs。

    先说下几个重要的类

    FileSystem

    FileSystem这个类非常重要,相当于一个连接,类似数据库中一个连接;连接还是比较费资源的,因此可以弄一个连接池,后面复用这些连接就可以了。
    一般获取的代码都是固定不变的。如下:

    public static FileSystem getDFS() throws IOException {
        conf = new Configuration();
        //设置安全验证方式为kerberos
        conf.set("hadoop.security.authentication", "kerberos");
        
        conf.set("fs.defaultFS", "hdfs://...");
        
        return FileSystem.get(conf);
    }
    
    1. 首先需要new一个Configuration,如果configuration中没有设置相关的参数,那么会取集群的配置文件,获取里面的配置信息。(一般我们也建议,不设置;因为主节点可能会变)
    2. 然后直接调用FileSystem.get()方法就可以得到一个FileSystem对象了。

    FileSystem中有很多方法,跟File中的方法非常类似,如exists、delete、mkdir、create等等一些常用的文件操作方法。

    这里主要看读和写方法,读是open方法

      /**
       * Opens an FSDataInputStream at the indicated Path.
       * @param f the file to open
       */
      public FSDataInputStream open(Path f) throws IOException {
        return open(f, getConf().getInt("io.file.buffer.size", 4096));
      }
    

    这个方法有一个读缓存,默认是4096,如果想要设置这个值,可以使用DistributedFileSystem类中open方法

    open方法就是得到一个输入流,这里再次强调java中的io相当重要啊,要是理解了java中的io这里的操作看下api就会了。

    写的话,只有append方法,而且一般是不推荐不适用该方法的,这个代价会比较大。hdfs文件系统也是不支持修改操作的。append方法见名知意,就是在文件后面进行追加。(因为文件是分块存放的,而且还有几个副本,修改的代价会非常大)

    create方法也要说一下,这个创建文件也分为两种,覆盖和不覆盖;create有很多重载的方法,选择一个自己用的就可以了。

    还有几种比较重要的方法;后面会提到,什么globStatus之类的

    FileStatus

    字面意思是文件的状态,其实我更倾向于理解为文件的属性,FileStatus中有一系列的方法,可以得到文件的信息。

    像一些getLen()得到文件的长度,以字节的形式。isFile、isDirectory、getReplication一些见名知意的方法,就不多说了。

    setOwner、setGroup、setPermission这些修改的方法,在外面没法使用,就不多说了。

    之所有要先说这个FileStatus,主要是为了好介绍下面非常有用的方法。

    globStatus

    这个不是一个类,是一个方法,但是很重要。
    我们经常需要处理一批文件,有通配符来匹配是非常方便的。FileSystem提供了两个通配符的方法

    public FileStatus[] globStatus(Path pathPattern) throws IOException
    
    public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException
    

    globStatus()方法返回于其路径匹配的所有文件的FileStatus对象数组,并按路径排序。PathFilter可以作为选项进一步对匹配结果进行限制。对于通配符的支持可以看另一篇文章hadoop支持的通配符

    IOUtils

    IOUtils这样类,在文件系统中一般都会有,这里主要是介绍hadoop中的。

    下面来看个例子,将本地文件复制到Hadoop文件系统

    public static void main(String[] args) throws IOException {
            File file = new File("E:\\文件\\学习/apache_hbase_reference_guide.pdf");
            InputStream in = new BufferedInputStream(new FileInputStream(file));
    
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://...:9000");
            FileSystem fileSystem = FileSystem.get(conf);
            float fileSize = file.length();
            FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("hdfs://.../world.pdf"), new Progressable() {
                int count = 1;
                @Override
                public void progress() {
                    System.out.println((count * 1024 * 64)/fileSize);
                    count ++;
                }
            });
    
            IOUtils.copyBytes(in, fsDataOutputStream, 4096, true);
        }
    

    Progressable类主要是展示进度的,重写的 progress 方法在每次上传了64KB(不是绝对的64KB,会有一定的偏差)字节大小的文件之后会自动调用一次;因此我们给出一个大概的上传进度。
    我们这里主要是IkanIOUtils.copyBytes方法,这个方法有很多重载的。

    先看刚刚使用的

      /**
       * Copies from one stream to another.
       *
       * @param in InputStrem to read from
       * @param out OutputStream to write to
       * @param buffSize the size of the buffer 
       * @param close whether or not close the InputStream and 
       * OutputStream at the end. The streams are closed in the finally clause.  
       */
      public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close)  throws IOException 
    

    复制整个流,可以设置缓冲的大小。
    注释写的非常清楚,一般也是建议使用这个方法,特别是boolean close这个参数,最好传递true,免得流没有关闭,导致其他的一些问题。有一个方法和这个非常类似

      /**
       * Copies count bytes from one stream to another.
       *
       * @param in InputStream to read from
       * @param out OutputStream to write to
       * @param count number of bytes to copy
       * @param close whether to close the streams
       * @throws IOException if bytes can not be read or written
       */
      public static void copyBytes(InputStream in, OutputStream out, long count, boolean close) throws IOException 
    

    这个方法是复制long count的byte的数据。这个用的非常少,但是很容易和上面弄混。

    FileUtil

    和其他文件系统一样,hadoop中也有FileUtil这个工具类。先来看看这个stat2Paths,这个方法会将一个数组的status转化为数组的path对象。

      /**
       * convert an array of FileStatus to an array of Path
       * 
       * @param stats
       *          an array of FileStatus objects
       * @return an array of paths corresponding to the input
       */
      public static Path[] stat2Paths(FileStatus[] stats) {
        if (stats == null)
          return null;
        Path[] ret = new Path[stats.length];
        for (int i = 0; i < stats.length; ++i) {
          ret[i] = stats[i].getPath();
        }
        return ret;
      }
    

    很多方法,看下api基本上就会用了,下面主要介绍一下,上传和下载;首先来看看上传,也就是将文件本地拷贝到hadoop文件系统,上面用IOUtils实现了一遍,下面用FileUtil也来实现一遍

    private static void copyFileByFile() throws IOException {
            File file = new File("E:\\文件\\学习/apache_hbase_reference_guide.pdf");
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs:/...:9000");
            FileSystem fileSystem = FileSystem.get(conf);
            FileUtil.copy(file, fileSystem, new Path("/dxy/hello.pdf"), false, conf);
        }
    

    这个使用比IOUtils来的要方便点,但是没有进度的展示。根据实际需求使用吧

    从hadoop中下载和上面非常类似,就是hadoop文件系统拷贝到本地

    private static void copyFileToLocal() throws IOException {
            File file = new File("E:\\文件\\学习/hello.pdf");
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://...:9000");
            FileSystem fileSystem = FileSystem.get(conf);
            FileUtil.copy(fileSystem, new Path("/dxy/hello.pdf"), file, false, conf);
        }
    

    还有hadoop上复制文件,和上面的也非常类似,只是目标的参数变了。

    下面要介绍一个比较重要的方法,合并文件,因为hadoop中对小文件的处理是非常不擅长的,因此我们可能需要对小文件进行合并。FileUtil中提供了一个方法copyMerge方法,

     /** Copy all files in a directory to one output file (merge). */
      public static boolean copyMerge(FileSystem srcFS, Path srcDir, 
                                      FileSystem dstFS, Path dstFile, 
                                      boolean deleteSource,
                                      Configuration conf, String addString) throws IOException {
        // 简直合并之后的文件名是否满足要求
        dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);   
    
        // 如果要合并的目录,下面还是目录,则返回false
        if (!srcFS.getFileStatus(srcDir).isDirectory())
          return false;
       // 创建目标文件
        OutputStream out = dstFS.create(dstFile);
        
        try {
          FileStatus contents[] = srcFS.listStatus(srcDir);
          Arrays.sort(contents);  // 按照文件名进行排序
          for (int i = 0; i < contents.length; i++) {
            if (contents[i].isFile()) {
              InputStream in = srcFS.open(contents[i].getPath());
              try {
                // 将要合并的文件复制到目标文件中,这里conf传递过去,就是为了得到io.file.buffer.size参数,作为写缓存的大小
                IOUtils.copyBytes(in, out, conf, false);
                if (addString!=null)
                  // 每个合并文件完了之后,添加一个addString
                  out.write(addString.getBytes("UTF-8"));
                    
              } finally {
                in.close();
              } 
            }
          }
        } finally {
          out.close();
        }
        
    
        if (deleteSource) {
          return srcFS.delete(srcDir, true);
        } else {
          return true;
        }
      }  
    

    代码还是比较简单的。如果只是简单的合并,这个方法完全够用了。如果有个需求,合并之后,仍然可以找到之前文件名对应的文件内容;当然我们也可以改写这个方法,将addString改为文件的名称,只需将添加addString代码去掉,添加上下面这行就行了。

         out.write(contents[i].getPath().getName().getBytes("UTF-8"));
    

    当然为了更快的读到想要的内容,我也可以弄一个类似的目录的东西,快速定位到文件内容在哪。

    hadoop的api还是很容易上手的,多用多总结。

    相关文章

      网友评论

          本文标题:HDFS的API简介

          本文链接:https://www.haomeiwen.com/subject/tbeykqtx.html