美文网首页
Java读写Hadoop文件

Java读写Hadoop文件

作者: 主君_05c4 | 来源:发表于2019-04-20 16:12 被阅读0次
    1、从Hadoop URL读取文件

    示例一:

    import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
    import org.apache.hadoop.io.IOUtils;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    public class HDFSFileReader {
    
        static {
            URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
        }
    
        public static void main(String[] args) {
            try {
                InputStream inputStream = new URL("hdfs://192.168.1.100:9000/home/ossuser/test.txt").openStream();
                IOUtils.copyBytes(inputStream, System.out, 4096, true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory())使得Java程序能够识别hdfs url schema;全局只能调用一次,若其他组件已声明一个URLStreamHandlerFactory实例,将无法使用当前示例读取Hadoop文件数据。
    copyBytes最后两个参数,第一个设置复制缓冲区大小,第二个设置复制结束后是否关闭数据流。

    示例二:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    public class HDFSFileReader1 {
    
        public static void main(String[] args) {
            Configuration conf = new Configuration();
    
            try {
                URI uri = new URI("hdfs://192.168.1.100:9000/home/ossuser/test.txt");
    
                FileSystem fs = FileSystem.get(uri, conf);
    
                Path path = new Path(uri);
                FSDataInputStream fsDataInputStream = fs.open(path);
                IOUtils.copyBytes(fsDataInputStream, System.out, 4096, true);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
        }
    }
    
    • public static FileSystem get(Configuration conf) throws IOException返回默认文件系统(在 core-site.xml指定,若没有设置,返回默认的本地文件系统)
    • public static FileSystem get(URI uri, Configuration conf) throws IOException通过给定的URI方案确定要使用的文件系统,若URI没有指定方案,返回默认文件系统
    • public static FileSystem get(final URI uri, final Configuration conf, String user) throws IOException, InterruptedException使用给定用户访问文件系统

    FSDataInputStream对象
    FSDataInputStream继承了java.io.DataInputStream类,实现了Seekable接口,可从流任意位置开始读取文件,

    public interface Seekable {
        void seek(long pos) throws IOException;
    
        long getPos() throws IOException;
    
        @Private
        boolean seekToNewSource(long var1) throws IOException;
    }
    
    • seek()定位长度大于文件长度,会抛出IOException
    • seek()绝对值定位,java.io.InputStream.skip()相对位置定位
    public class HDFSFileReader2 {
    
        public static void main(String[] args) {
            Configuration configuration = new Configuration();
            try {
                URI uri = new URI("hdfs://192.168.1.100:9000/home/ossuser/test.txt");
    
                FileSystem fileSystem = FileSystem.get(uri, configuration);
    
                FSDataInputStream fsDataInputStream = fileSystem.open(new Path(uri));
                IOUtils.copyBytes(fsDataInputStream, System.out, 4096, false);  -- 必须false
    
                System.out.println("-----------------------------");
                fsDataInputStream.seek(5);
                IOUtils.copyBytes(fsDataInputStream, System.out, 4096, true);
            } catch (URISyntaxException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    输出:

    line1
    line2
    -----------------------------
    line2
    

    FSDataInputStream也实现了PositionedReadable接口,从一个指定偏移量处读取文件的一部分

    public interface PositionedReadable {
      /**
       * Read up to the specified number of bytes, from a given
       * position within a file, and return the number of bytes read. This does not
       * change the current offset of a file, and is thread-safe.
       *
       * <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
       * @param position position within file
       * @param buffer destination buffer
       * @param offset offset in the buffer
       * @param length number of bytes to read
       * @return actual number of bytes read; -1 means "none"
       * @throws IOException IO problems.
       */
      int read(long position, byte[] buffer, int offset, int length)
        throws IOException;
      
      /**
       * Read the specified number of bytes, from a given
       * position within a file. This does not
       * change the current offset of a file, and is thread-safe.
       *
       * <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
       * @param position position within file
       * @param buffer destination buffer
       * @param offset offset in the buffer
       * @param length number of bytes to read
       * @throws IOException IO problems.
       * @throws EOFException the end of the data was reached before
       * the read operation completed
       */
      void readFully(long position, byte[] buffer, int offset, int length)
        throws IOException;
      
      /**
       * Read number of bytes equal to the length of the buffer, from a given
       * position within a file. This does not
       * change the current offset of a file, and is thread-safe.
       *
       * <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
       * @param position position within file
       * @param buffer destination buffer
       * @throws IOException IO problems.
       * @throws EOFException the end of the data was reached before
       * the read operation completed
       */
      void readFully(long position, byte[] buffer) throws IOException;
    }
    

    read()方法,从文件position位置开始,读取最多length长度数据,存入至buffer指定偏移量offset处,返回值为实际读取的字节数,可能小于length,-1代表未读取到字节;readFully从文件position位置开始,读取指定length长度的字节至buffer,若读取字节数未达到length,就已到达文件尾,抛出EOFException异常.
    所有这些方法会保留文件的偏移量,且是线程安全,在读取文件的主体时,可以访问文件的其他部分。
    seek()是高开销的操作

    2、Hadoop写文件

    FileSystem有两种创建文件的方式:

    • public FSDataOutputStream create(Path f, Progressable progress) throws IOException;
    • public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.util.Progressable;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    public class HDFSFileWriter {
    
        public static void main(String[] args) {
            try {
                //System.setProperty("HADOOP_USER_NAME", "ossuser");
    
                URI uri = URI.create("hdfs://192.168.1.100:9000/home/ossuser/log1.txt");
    
                Configuration configuration = new Configuration();
                FileSystem fs = FileSystem.get(uri, configuration, "ossuser");
                FSDataOutputStream fos = fs.create(new Path(uri), new CreateHDFSFileProgress("log1.txt"));
    
                FileInputStream fis = new FileInputStream("D:/temp/xx.txt.tmp");
    
                IOUtils.copyBytes(fis, fos, 4096, true);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        static class CreateHDFSFileProgress implements Progressable {
    
            private String fileName;
            private int count;
    
            public CreateHDFSFileProgress(String fileName) {
                this.fileName = fileName;
            }
    
            @Override
            public void progress() {
                count ++;
                System.out.println(fileName + " has written " + (count * 64) + " KB");
            }
        }
    }
    

    本地开发调试,将以当前操作系统用户名往HDFS目标文件夹写文件,会报无权限异常,如下:

    org.apache.hadoop.security.AccessControlException: Permission denied: user=z00442530, access=WRITE, inode="/home/ossuser":ossuser:supergroup:drwxr-xr-x
    

    两种方式设置用户名:

    • System.setProperty("HADOOP_USER_NAME", "ossuser");
    • FileSystem fs = FileSystem.get(uri, configuration, "ossuser");

    create方法有多个重载版本,可传入包括是否覆盖文件、缓冲区大小、副本数、块大小、文件权限、进度回调接口
    Hadoop每写入约64KB数据至datanod后,即调用一次progess()方法

    相关文章

      网友评论

          本文标题:Java读写Hadoop文件

          本文链接:https://www.haomeiwen.com/subject/rtdkgqtx.html