简洁代码的配置
将core-site.xml复制到根目录下,配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定HDFS中NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://pdc:9000</value>
</property>
<!-- 指定hadoop运行时产生文件的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-2.7.2/data/tmp</value>
</property>
</configuration>
代码模板
@Test
public void mkdirAtHDFS() throws Exception{
//1.创建配置信息对象
Configuration configuration = new Configuration();
//2.创建文件系统,如果配置上面的core-site.xml,则传configuration即可
FileSystem fs = FileSystem.get(new URI("hdfs://pdc:9000"),configuration, "root");
//3.调用API
fs.xxx(xxx);
}
1.获取文件系统
FileSystem fs = FileSystem.get(configuration);
2.文件上传
API:
// 创建要上传文件所在的本地路径
Path src = new Path("e:/pdc.txt");
// 创建要上传到hdfs的目标路径
Path dst = new Path("hdfs://pdc:9000/user/pdc");
// 拷贝文件,并删除源文件
fs.copyFromLocalFile(true,src, dst);
注:不用hadoop的API进行流操作是因为运算可能会基于Spark,更快,用io就不用依赖于hadoop
IO:
//创建输入流
FileInputStream inStream = new FileInputStream(new File("e:/hello.txt"));
//获取输出路径
String putFileName = "hdfs://pdc:9000/user/pdc/hello1.txt";
Path writePath = new Path(putFileName);
//创建输出流
FSDataOutputStream outStream = fs.create(writePath);
// 5 流对接
try{
IOUtils.copyBytes(inStream, outStream, 4096, false);
}catch(Exception e){
e.printStackTrace();
}finally{
IOUtils.closeStream(inStream);
IOUtils.closeStream(outStream);
}
3.文件下载
API:
fs.copyToLocalFile(false, new Path("hdfs://pdc:9000/user/atguigu/hello.txt"), new Path("e:/hellocopy.txt"), true);
IO:
//获取读取文件路径
String filename = "hdfs://pdc:9000/user/pdc/hello1.txt";
//创建读取path
Path readPath = new Path(filename);
// 4 创建输入流
FSDataInputStream inStream = fs.open(readPath);
//流对接输出到控制台
try{
IOUtils.copyBytes(inStream, System.out, 4096, false);
}catch(Exception e){
e.printStackTrace();
}finally{
IOUtils.closeStream(inStream);
}
4.创建目录
fs.mkdirs(new Path("hdfs://pdc:9000/user/pdc/output"));
5.删除文件夹
fs.delete(new Path("hdfs://pdc:9000/user/root/output"), true);
6.修改文件名
fs.rename(new Path("hdfs://pdc:9000/user/root/hello.txt"), new Path("hdfs://pdc:9000/user/root/hellonihao.txt"));
7.查看文件详情
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
//注意返回的是迭代器
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println(fileStatus.getPath().getName());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getLen());
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation bl : blockLocations) {
System.out.println("block-offset:" + bl.getOffset());
String[] hosts = bl.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
}
8.查看文件夹
//获取查询路径下的文件状态信息
FileStatus[] listStatus = fs.listStatus(new Path("/"));
//遍历所有文件状态
for (FileStatus status : listStatus) {
if (status.isFile()) {
System.out.println("f--" + status.getPath().getName());
} else {
System.out.println("d--" + status.getPath().getName());
}
}
9.IO流定位文件读取
下载第一块
@Test
//定位下载第一块内容
public void readFileSeek1() throws Exception {
//创建配置信息对象
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://pdc:9000"), configuration, "root");
//获取输入流路径
Path path = new Path("hdfs://pdc:9000/user/root/tmp/hadoop-2.7.2.tar.gz");
//打开输入流
FSDataInputStream fis = fs.open(path);
// 4 创建输出流
FileOutputStream fos = new FileOutputStream("e:/hadoop-2.7.2.tar.gz.part1");
// 5 流对接
byte[] buf = new byte[1024];
for (int i = 0; i < 128 * 1024; i++) {
fis.read(buf);
fos.write(buf);
}
//关闭流
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
}
下载第二块
@Test
// 定位下载第二块内容
public void readFileSeek2() throws Exception{
//创建配置信息对象
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://pdc:9000"), configuration, "root");
//获取输入流路径
Path path = new Path("hdfs://pdc:9000/user/pdc/tmp/hadoop-2.7.2.tar.gz");
//打开输入流
FSDataInputStream fis = fs.open(path);
//创建输出流
FileOutputStream fos = new FileOutputStream("e:/hadoop-2.7.2.tar.gz.part2");
//定位偏移量(第二块的首位)
fis.seek(1024 * 1024 * 128);
//流对接
IOUtils.copyBytes(fis, fos, 1024);
//关闭流
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
}
合并文件
cmd中:
type hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1
作者:木棉上的光
来源:CSDN
原文:https://blog.csdn.net/qq_41594698/article/details/89685084
版权声明:本文为博主原创文章,转载请附上博文链接!
网友评论