1、从Hadoop URL读取文件
示例一:
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
public class HDFSFileReader {
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) {
try {
InputStream inputStream = new URL("hdfs://192.168.1.100:9000/home/ossuser/test.txt").openStream();
IOUtils.copyBytes(inputStream, System.out, 4096, true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory())
使得Java程序能够识别hdfs url schema;全局只能调用一次,若其他组件已声明一个URLStreamHandlerFactory
实例,将无法使用当前示例读取Hadoop文件数据。
copyBytes
最后两个参数,第一个设置复制缓冲区大小,第二个设置复制结束后是否关闭数据流。
示例二:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class HDFSFileReader1 {
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
URI uri = new URI("hdfs://192.168.1.100:9000/home/ossuser/test.txt");
FileSystem fs = FileSystem.get(uri, conf);
Path path = new Path(uri);
FSDataInputStream fsDataInputStream = fs.open(path);
IOUtils.copyBytes(fsDataInputStream, System.out, 4096, true);
} catch (IOException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
}
public static FileSystem get(Configuration conf) throws IOException
返回默认文件系统(在 core-site.xml指定,若没有设置,返回默认的本地文件系统)public static FileSystem get(URI uri, Configuration conf) throws IOException
通过给定的URI方案确定要使用的文件系统,若URI没有指定方案,返回默认文件系统public static FileSystem get(final URI uri, final Configuration conf, String user) throws IOException, InterruptedException
使用给定用户访问文件系统
FSDataInputStream
对象
FSDataInputStream
继承了java.io.DataInputStream
类,实现了Seekable
接口,可从流任意位置开始读取文件,
public interface Seekable {
void seek(long pos) throws IOException;
long getPos() throws IOException;
@Private
boolean seekToNewSource(long var1) throws IOException;
}
seek()
定位长度大于文件长度,会抛出IOExceptionseek()
绝对值定位,java.io.InputStream.skip()相对位置定位
public class HDFSFileReader2 {
public static void main(String[] args) {
Configuration configuration = new Configuration();
try {
URI uri = new URI("hdfs://192.168.1.100:9000/home/ossuser/test.txt");
FileSystem fileSystem = FileSystem.get(uri, configuration);
FSDataInputStream fsDataInputStream = fileSystem.open(new Path(uri));
IOUtils.copyBytes(fsDataInputStream, System.out, 4096, false); -- 必须false
System.out.println("-----------------------------");
fsDataInputStream.seek(5);
IOUtils.copyBytes(fsDataInputStream, System.out, 4096, true);
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
输出:
line1
line2
-----------------------------
line2
FSDataInputStream
也实现了PositionedReadable
接口,从一个指定偏移量处读取文件的一部分
public interface PositionedReadable {
/**
* Read up to the specified number of bytes, from a given
* position within a file, and return the number of bytes read. This does not
* change the current offset of a file, and is thread-safe.
*
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
* @param position position within file
* @param buffer destination buffer
* @param offset offset in the buffer
* @param length number of bytes to read
* @return actual number of bytes read; -1 means "none"
* @throws IOException IO problems.
*/
int read(long position, byte[] buffer, int offset, int length)
throws IOException;
/**
* Read the specified number of bytes, from a given
* position within a file. This does not
* change the current offset of a file, and is thread-safe.
*
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
* @param position position within file
* @param buffer destination buffer
* @param offset offset in the buffer
* @param length number of bytes to read
* @throws IOException IO problems.
* @throws EOFException the end of the data was reached before
* the read operation completed
*/
void readFully(long position, byte[] buffer, int offset, int length)
throws IOException;
/**
* Read number of bytes equal to the length of the buffer, from a given
* position within a file. This does not
* change the current offset of a file, and is thread-safe.
*
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
* @param position position within file
* @param buffer destination buffer
* @throws IOException IO problems.
* @throws EOFException the end of the data was reached before
* the read operation completed
*/
void readFully(long position, byte[] buffer) throws IOException;
}
read()
方法,从文件position位置开始,读取最多length长度数据,存入至buffer指定偏移量offset处,返回值为实际读取的字节数,可能小于length,-1代表未读取到字节;readFully
从文件position位置开始,读取指定length长度的字节至buffer,若读取字节数未达到length,就已到达文件尾,抛出EOFException
异常.
所有这些方法会保留文件的偏移量,且是线程安全,在读取文件的主体时,可以访问文件的其他部分。
seek()
是高开销的操作
2、Hadoop写文件
FileSystem
有两种创建文件的方式:
public FSDataOutputStream create(Path f, Progressable progress) throws IOException;
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class HDFSFileWriter {
public static void main(String[] args) {
try {
//System.setProperty("HADOOP_USER_NAME", "ossuser");
URI uri = URI.create("hdfs://192.168.1.100:9000/home/ossuser/log1.txt");
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(uri, configuration, "ossuser");
FSDataOutputStream fos = fs.create(new Path(uri), new CreateHDFSFileProgress("log1.txt"));
FileInputStream fis = new FileInputStream("D:/temp/xx.txt.tmp");
IOUtils.copyBytes(fis, fos, 4096, true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class CreateHDFSFileProgress implements Progressable {
private String fileName;
private int count;
public CreateHDFSFileProgress(String fileName) {
this.fileName = fileName;
}
@Override
public void progress() {
count ++;
System.out.println(fileName + " has written " + (count * 64) + " KB");
}
}
}
本地开发调试,将以当前操作系统用户名往HDFS目标文件夹写文件,会报无权限异常,如下:
org.apache.hadoop.security.AccessControlException: Permission denied: user=z00442530, access=WRITE, inode="/home/ossuser":ossuser:supergroup:drwxr-xr-x
两种方式设置用户名:
System.setProperty("HADOOP_USER_NAME", "ossuser");
FileSystem fs = FileSystem.get(uri, configuration, "ossuser");
create
方法有多个重载版本,可传入包括是否覆盖文件、缓冲区大小、副本数、块大小、文件权限、进度回调接口
Hadoop每写入约64KB数据至datanod后,即调用一次progess()
方法
网友评论