美文网首页
HDFS之JAVA API学习笔记

HDFS之JAVA API学习笔记

作者: 我是老薛 | 来源:发表于2018-11-28 20:27 被阅读0次

    本文是对HDFS的JAVA API操作的一个学习总结,包括如下章节的内容:

    • 概述
    • 目录和文件操作
    • 文件上传和下载
    • 读写数据操作
    • 本地文件系统支持

    参考资料:

    1、本文介绍的内容依赖hadoop环境,关于hadoop运行环境的搭建可参见《Hadoop运行环境搭建》

    2、如果想了解下HDFS的基本概念,可先阅读《HDFS学习笔记》

    一、概述

    我们除了通过命令行接口访问HDFS系统外,还可以通过hadoop类库提供的Java API编写java程序来访问HDFS系统,如进行文件的上传、下载、目录的创建、文件的删除等各种文件操作。

    hadoop类库中提供的HDFS操作的核心API是FileSystem抽象类,该类提供了一系列方法来进行相关的操作。

    FileSystem是一个通用的文件系统API,它是一个抽象类,可以通过其提供的静态工厂方法来获取该类的实例。获取HDFS文件系统FileSystem的实例最常用的静态方法是:

    static FileSystem get(Configuration conf);

    参数Configuration对象是对文件系统中属性信息的封装,默认的属性来自hadoop配置文件core-site.xml中的配置,当然也可以通过代码进行属性的设置。进行文件操作的基本流程是:

    1、创建Configuration对象

    2、利用FileSystem 的静态get方法获取FileSystem 实例

    3、调用FileSystem 的方法进行实际的文件操作

    下面我们通过实际的例子来进行学习。

    二、目录和文件操作API

    1、创建目录

    利用FileSystem的mkdirs方法可以创建文件或目录,其定义如下:

    public boolean mkdirs(Path f) throws IOException;

    该方法的参数用于指定待创建的目录路径,需要说明的是,mkdirs命令可以级联创建目录,即如果指定待创建的目录的上级目录不存在,则会一次帮创建。

    例子代码如下:

    package com.hdfs;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class CreateDir {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            try {
                FileSystem HDFS = FileSystem.get(conf);
    HDFS.mkdirs(new Path(args[0]));
                System.out.println("cerate success");
            } catch (IOException e) {
                System.out.println("cerate error");
                e.printStackTrace();
            } 
        }
    }
    

    上面代码是一个普通的带main方法的java类。main方法中第二行代码如下:

    conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");

    上面代码的目的是设置本程序(也就是HDFS客户端)所访问的HDFS服务的地址。如果不通过代码进行设置,则默认会取hadoop安装环境下core-site.xml文件中配置的"fs.defaultFS"的属性值。所以,如果程序是运行在服务器上(即name节点所在机器)则不用通过代码进行显示设置。

    代码有了,那我们怎么编译和执行程序呢?这个和处理mapreduce程序类似,可以参考《mapreduce学习笔记》一文中的介绍。这里再简单介绍下。

    我们在IDE中编写上面程序,要想代码可以编译成功,只需要引入hadoop-common-2.7.6.jar。但运行的时候,需要依赖更多的Jar包。最简单的运行方式是,将程序编译后的class打成jar包,然后在命令行下利用hadoop jar命令来执行,该命令会自动引入执行程序需要的jar包。

    假设上面代码打成jar包的名称为testhdfs.jar,则在命令行下运行如下命令(待创建的目录路径是通过参数传入):

    hadoop jar testhdfs.jar com.hdfs.CreateDir /testdir

    执行后,根目录下就会生成一个testdir目录。

    下面介绍的例子可以利用同样的方法进行编译和运行,后续不会再重复介绍。

    2、删除目录或文件

    利用FileSystem的delete方法可以删除文件或目录,其定义如下:

    public boolean delete(Path f, boolean recursive) throws IOException

    如果参数f指定的是文件或空目录,则参数recursive的值被忽略;

    如果参数f指定的目录不未空,则如果参数recursive的值为false,则执行会抛异常,只有值为true,才会将该目录及其下内容全部删除。

    例子代码如下:

    package com.hdfs;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class DeleteFile {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            try {
                FileSystem HDFS = FileSystem.get(conf);
                boolean re = HDFS.delete(new Path(args[0]), true);
                System.out.println("delete:"+re);
            } catch (IOException e) {
                e.printStackTrace();
            } 
        }
    }
    

    3、重命名(移动)文件或目录

    利用FileSystem的rename方法可以重命名文件或目录,如果被移动的文件或目录其父目录发生变化,则相当于移动文件或目录。该方法定义如下:

    public boolean rename(Path src,Path dest) throws IOException

    该方法有两个参数,第一个参数是被操作的文件或目录的路径,第二参数是待修改后的路径。如果操作成功,方法的返回值为true,否则为false。

    例子代码如下:

    package com.hdfs;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class ReName {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            try {
                FileSystem HDFS = FileSystem.get(conf);
                boolean re = HDFS.rename(new Path(args[0]), new Path(args[1]));
                System.out.println("rename:"+re);
            } catch (IOException e) {
                e.printStackTrace();
            } 
        }
    }
    

    4、判断文件或目录是否存在

    利用FileSystem的exists方法可以判断指定的文件或目录是否存在,其定义如下:

    public boolean exists(Path f) throws IOException;

    如果存在,方法返回值为true,否则为false。

    例子代码如下:

    package com.hdfs;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class CheckExist {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            try {
                FileSystem HDFS = FileSystem.get(conf);
                boolean re = HDFS.exists(new Path(args[0]));
                System.out.println("is exist:"+re);
            } catch (IOException e) {
                e.printStackTrace();
            } 
        }
    }
    

    5、判断是文件还是目录

    利用FileSystem的isFile方法可以判断指定的路径是不是文件;利用isDirectory方法可以判断指定的路径是不是目录。两个方法的定义如下:

    public boolean isFile(Path f) throws IOException;

    public boolean isDirectory(Path f) throws IOException;

    如果存在指定的路径不存在,则上面两个方法都返回false。

    例子代码如下:

    package com.hdfs;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class IsFile {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            try {
                FileSystem HDFS = FileSystem.get(conf);
                boolean re = HDFS.isDirectory(new Path(args[0]));
                System.out.println("isDirectory:"+re);
                re = HDFS.isFile(new Path(args[0]));
                System.out.println("isFile:"+re);
            } catch (IOException e) {
                e.printStackTrace();
            } 
        }
    }
    

    6、查看目录和文件的内容

    利用FileSystem的listStatus方法可以获取指定的文件的信息或目录下内容的信息,其定义如下:

    public FileStatus[] listStatus(Path arg0)

    throws FileNotFoundException, IOException

    该方法返回一个FileStatus类型的对象数组,FileStatus对象中封装了文件(或目录的)各种信息,如名称,访问时间,大小等。方法的参数是一个路径,如果路径指向一个文件,则返回的是数组就一个元素,代表该文件的信息;如果路径指向的是是一个目录,则返回的是该目录下子目录和文件信息。

    例子代码如下:

    package com.hdfs;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class ShowFile {
        public static void main(String[] args) throws IOException {
            Configuration conf = null;
            FileSystem fs = null;
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            fs = FileSystem.get(conf);
            FileStatus[] listStatus = fs.listStatus(new Path(args[0]));
            for(FileStatus fileStatus:listStatus){
                String fileName = fileStatus.getPath().getName();
                String type = fileStatus.isFile()?"file":"dir";
                long size = fileStatus.getLen();
                System.out.println(fileName+","+type+","+size);
            }
        }
    }
    

    7、复制文件或目录

    上面的例子都是利用FileSystem中的方法进行文件和目录的操作,在文件操作中,还有一个常见的操作就是文件和目录的复制。但是FileSystem没有提供接口来进行复制操作。这时可以利用FileContext接口提供的操作来实现。具体我们看一个例子,代码如下:

    package com.hdfs;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileContext;
    import org.apache.hadoop.fs.FileContext.Util;
    import org.apache.hadoop.fs.Path;
    
    public class CopyFile {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            try {
                FileContext fileContext = FileContext.getFileContext(conf);
                Util util = fileContext.util();
                boolean re=util.copy(new Path(args[0]), new Path(args[1]));
                System.out.println("copy:"+re);
            } catch (IOException e) {
                e.printStackTrace();
            } 
        }
    }
    

    上面代码先利用FileContext 的静态方法获取到一个FileContext 对象,类似获取FileSystem方法,需要传入一个Configuration 对象的参数。然后再获取一个Util对象。Util对象提供的copy方法可以完成复制操作。copy方法有两个参数,第1个参数是待复制的文件或目录,第2个参数是复制后的路径(含文件名或目录名),要求文件或目录名不存在。如果复制成功,copy方法返回true。

    Util对象中也有些方法,类似FileSystem中的方法可以完成相同功能的一些文件操作。

    三、文件上传和下载操作API

    1、上传本地文件

    利用FileSystem的copyFromLocalFile方法可以将本地文件或目录(及目录下所有内容)上传到HDFS系统上,其定义如下:

    public void copyFromLocalFile(Path src, Path dst) throws IOException;

    1)该方法有两个参数,第一个参数是待上传的文件或目录的本地路径,第二个参数是HDFS系统上的目的路径。

    2)如果待上传的是一个目录,则会把整个目录及目录下的所有内容上传。

    3)如果目的路径是一个目录,则会把本地文和目录上传到该目的目录下,被上传的文件或目录名不变。如果目的路径是一个不存在的路径,则会把本地文和目录上传到该目的路径的上级目录下,被上传的文件或目录名变为目的路径指定的名称。

    4)copyFromLocalFile还有多个重载方法,可以选择上传后是否同时删除本地文件或目录,以及是否覆盖目的文件或目录。

    例子代码如下:

    package com.hdfs;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class PutFile {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            try {
                FileSystem HDFS = FileSystem.get(conf);
                HDFS.copyFromLocalFile(new Path(args[0]), new Path(args[1]));
                System.out.println("action success");
            } catch (IOException e) {
                e.printStackTrace();
            } 
        }
    }
    

    2、下载文件到本地

    利用FileSystem的copyToLocalFile方法可以将HDFS系统上的文件或目录(及目录下所有内容)下载到本地系统上,其定义如下:

    public void copyToLocalFile(Path src, Path dst) throws IOException;

    该方法的使用与上面介绍的上传本地文件或目录到HDFS系统上的copyFromLocalFile方法非常类似,区别只是源路径和目的路径正好相反,这里不再详细介绍。直接看一个例子:

    package com.hdfs;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class PutFile {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            try {
                FileSystem HDFS = FileSystem.get(conf);
                HDFS.copyToLocalFile(new Path(args[0]), new Path(args[1]));
                System.out.println("action success");
            } catch (IOException e) {
                e.printStackTrace();
            } 
        }
    }
    

    四、读写数据操作API

    1、读取数据

    调用FileSystem的open方法(参数是HDFS系统上的一个文件)会返回一个FSDataInputStream对象,利用该对象可以读取文件中的数据。

    我们先看一个例子代码,该例子是读取一个文本文件中的内容:

    package com.hdfs;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class ReadData {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            BufferedReader reader = null;
            String line = null;
            try {
                FileSystem HDFS = FileSystem.get(conf);
                FSDataInputStream in = HDFS.open(new Path(args[0]));
                reader = new BufferedReader(new InputStreamReader(in));
                while ((line = reader.readLine()) != null){
                    System.out.println(line);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                if(reader!=null){
                    try {
                        reader.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    上面程序,首先调用FileSystem的open方法获取到一个FSDataInputStream对象,是一个数据流对象,通过它提供的方法可以读取流中的数据,但该类的方法都是一些级别较低的底层方法,不适合用来读取文本文件,对于文本文件,我们一般习惯逐行读取数据,所以使用了JAVA IO 中的BufferedReader来包装FSDataInputStream对象来读取数据。最后要在finally语句块中关闭BufferedReader对象。

    如果要读取二进制文件,可利用工具类IOUtils中的方法,如下面例子:

    package com.hdfs;
    import java.io.IOException;
    import java.io.OutputStream;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    public class ReadData2 {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            FSDataInputStream in = null;
            try {
                FileSystem HDFS = FileSystem.get(conf);
                in = HDFS.open(new Path(args[0]));
                OutputStream out = System.out;
                IOUtils.copyBytes(in, out, 4096, true);
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                IOUtils.closeStream(in);
            }
        }
    }
    

    上面程序使用的IOUtils的copyBytes方法(它有多个重载的方法),该方法是个通用的方法,用于将一个数据流中的数据写入另外一个数据流,它有4个参数,第1个参数是InputStream类型,即被读取出来的数据流,在这个例子中就是FSDataInputStream 对象,第2个参数是OutputStream类型,即要写入的数据流对象,这里为了演示简单,直接使用了系统输出对象System.out,第3个参数表示本次读写操作数据的规模(这里是4096个字节),第4个参数是个布尔值,为true,表示调用完毕后关闭流对象。

    需要说明的是,如果第4个参数设置为true,则因为调用后文件流被关闭了,不能再次读取。如果要能连续多次调用IOUtils的copyBytes方法,则第4个参数不能设置为true,以为在finally语句块中也进行了关闭操作,所以不会导致文件流不会被关闭。FSDataInputStream 对象内部有一个位置指针,被读取一次后,指针会指向读取数据后的位置,再次读取,会从新的位置往后读取。

    同时FSDataInputStream 对象支持随机访问,可以调用其seek方法定位到文件指定的位置开始读取,注意不能超过文件的长度。

    我们看下面一个例子,利用seek方法多次读取数据,代码如下:

    package com.hdfs;
    import java.io.IOException;
    import java.io.OutputStream;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    public class ReadData2 {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            FSDataInputStream in = null;
            try {
                FileSystem HDFS = FileSystem.get(conf);
                in = HDFS.open(new Path(args[0]));
                OutputStream out = System.out;
                IOUtils.copyBytes(in, out, 4096,false);
                System.out.println("read again");
                in.seek(0);
                IOUtils.copyBytes(in, out, 4096, false);
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                IOUtils.closeStream(in);
            }
        }
    }
    

    需要注意的是,seek方法是相对高开销的操作,需要慎重使用。对于大数据,建议采用顺序读取的方式(可采用Mapreduce来进行读取数据)。

    2、创建文件写入数据

    调用FileSystem的create方法(该方法有多个重载方法)会创建一个空的文件,同时该方法会返回一个FSDataOutputStream对象,利用该对象可以往文件中写入数据。如果create方法指定的新文件所在的上级目录不存在,会自动帮创建。

    下面我们先看一个例子,该例子创建一个文件,写入文本信息:

    package com.hdfs;
    import java.io.BufferedWriter;
    import java.io.IOException;
    import java.io.OutputStreamWriter;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class WriteData {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            BufferedWriter writer = null;
            try {
                FileSystem HDFS = FileSystem.get(conf);
                FSDataOutputStream out = HDFS.create(new Path(args[0]));
                writer = new BufferedWriter(new OutputStreamWriter(out));
                writer.write("hello1");
                writer.newLine();
                writer.write("hello2");
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                if(writer!=null){
                    try {
                        writer.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    上面代码先调用FileSystem的create方法创建一个空文件(如果文件已存在,则会被覆盖),该方法返回一个FSDataOutputStream 对象,FSDataOutputStream 对象中有很多写数据的方法,不过直接使用不太方便。我们这里是准备下写入文本信息,因此利用了java io中的 BufferedWriter 来写入文本信息。

    如果要写入二进制数据,可利用工具类IOUtils中的方法,如下面例子:

    package com.hdfs;
    import java.io.ByteArrayInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    public class WriteData2 {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            FSDataOutputStream out=null;
            try {
                FileSystem HDFS = FileSystem.get(conf);
                out = HDFS.create(new Path(args[0]));
                InputStream in = buildInputBuffer();
                IOUtils.copyBytes(in , out, 4096, true);
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                IOUtils.closeStream(out);
            }
        }
    
        private static InputStream buildInputBuffer() {
            byte[] byt = new byte[1024];
            for(int i=0;i<1024;i++){
                byt[i]=66;
            }
            InputStream in = new ByteArrayInputStream(byt);
            return in;
        }
    }
    

    上面程序使用的IOUtils的copyBytes方法与上面从HDFS文件中读取数据例子中使用的是一样的方法。只是我们这里是从一个字节流(为了简化,自己构建的)中读取数据写入到create方法返回的FSDataOutputStream 对象中。

    一般来说,往HDFS中写入数据都是数据量比较大的,整个过程所需时间较长。为了让客户端能获取写入过程中的状态,FileSystem的create另一个重载方法提供了一个参数(Progressable接口)可以获取数据写入的进度。具体我们看一个例子,例子代码如下:

    package com.hdfs;
    import java.io.ByteArrayInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.util.Progressable;
    
    public class WriteData3 {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            FSDataOutputStream out=null;
            try {
                FileSystem HDFS = FileSystem.get(conf);
                out = HDFS.create(new Path(args[0]),new Progressable() {
                    @Override
                    public void progress() {
                        System.out.print(".");
                    }
                });
                InputStream in = buildInputBuffer();
                IOUtils.copyBytes(in , out, 4096, true);
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                IOUtils.closeStream(out);
            }
        }
    
        private static InputStream buildInputBuffer() {
            int size = 1024*10000;
            byte[] byt = new byte[size];
            for(int i=0;i<size;i++){    
                byt[i]=66;
            }
            InputStream in = new ByteArrayInputStream(byt);
            return in;
        }
    }
    

    相比前面的例子,在本例子中,FileSystem的create方法多了一个参数,是一个Progressable接口对象,该接口有一个progress方法,HDFS在写入数据的过程中,会根据进度不断调用progress方法,这样我们实现progress方法就能知道数据写入的进度了。

    3、追加数据到已有的文件中

    在某些场景下,我们需要往已经存在文件的末尾追加写入数据。这时我们可以调用FileSystem的append方法来打开一个文件,并返回FSDataOutputStream对象,然后就可以写入数据。除了使用append方法替换create方法,其它操作都一样,这里不再重复介绍。可以看一个简单例子,代码如下:

    package com.hdfs;
    import java.io.BufferedWriter;
    import java.io.IOException;
    import java.io.OutputStreamWriter;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class AppendData {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");
            BufferedWriter writer = null;
            try {
                FileSystem HDFS = FileSystem.get(conf);
                FSDataOutputStream out = HDFS.append(new Path(args[0]));
                writer = new BufferedWriter(new OutputStreamWriter(out));
                writer.write("newdata");
                writer.newLine();
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                if(writer!=null){
                    try {
                        writer.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    可以看出,往已存在文件中添加数据,只需将create方法换成append方法。需要注意的是,append方法指定的文件必须已经存在,如果不存在,h不会自动创建,而是会报异常。

    五、本地文件系统支持

    在本文中,我们通过多个例子介绍了如何使用hadoop提供的JAVA API来操作HDFS系统。可以看出,核心就是FileSystem类中提供的各种方法的使用。在最上面我们也提到,FileSystem类是一个通用的文件系统操作API,不仅可以用来操作HDFS系统,也可以用来访问其它文件系统,比如本地文件系统。

    这就带来一个好处,我们在集群环境下开发和调试代码比较麻烦,这样我们可以先访问本地的文件系统来进行代码逻辑的验证,如果代码没问题,我们可以再到集群环境下去验证。这有点和mapreduce程序也可以在本地运行类似。

    我们把上面写数据的例子改成到本地环境下运行,代码如下:

    package com.hdfs;
    import java.io.BufferedWriter;
    import java.io.IOException;
    import java.io.OutputStreamWriter;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class WriteData {
        public static void main(String[] args) {
            Configuration conf=new Configuration(); 
            conf.set("fs.defaultFS", "file:///");
            BufferedWriter writer = null;
            try {
                FileSystem HDFS = FileSystem.get(conf);
                FSDataOutputStream out = HDFS.create(new Path(args[0]));
                writer = new BufferedWriter(new OutputStreamWriter(out));
                writer.write("hello1");
                writer.newLine();
                writer.write("hello2");
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                if(writer!=null){
                    try {
                        writer.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    可以看出,与前面例子相比,我们只是改了一句代码,将

    conf.set("fs.defaultFS", "hdfs://192.168.1.3:8020");

    语句改为

    conf.set("fs.defaultFS", "file:///");

    设置"fs.defaultFS"属性为"file:///",意思就是访问的是本地文件系统。上面程序无论是在linux或海上windows下都可以运行。如在linux下运行:

    hadoop jar testhdfs.jar /home/hadoop/test.txt

    在windows下运行:

    hadoop jar testhdfs.jar d:/test.txt

    需要特别注意的是,FileSystem中的方法并不是全部能够在各种文件系统中运行,比如append方法经过测试发现本地文件系统就不支持。所以在开发调试时需要特别注意这点。

    相关文章

      网友评论

          本文标题:HDFS之JAVA API学习笔记

          本文链接:https://www.haomeiwen.com/subject/wxazqqtx.html