使用hadoop客户端api访问hdfs
1.创建java项目
2.导入hadoop类库
注意
我用的是idea创建的java项目,用maven来管理jar包
@Test
public void readFile() throws Exception {
System.out.println("HELLO");
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
URL url = new URL("hdfs://20.18.5.1:8020/user/hadoop/hadoop/test.txt");
URLConnection conn = url.openConnection();
InputStream is = conn.getInputStream();
byte[] buf = new byte[is.available()];
is.read(buf);
is.close();
String str = new String(buf);
System.out.println(str);
}
在跑test的时候,因为hadoop的依赖很多,想的是在用的时候再一次导入,因为原来没有hdfs协议,所以要跑url这一行的时候是过不去的,需要写URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
这行代码才能过去。FsUrlStreamHandlerFactory是在hadoop-common.jar里面的,所以我只导入了hadoop-common
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.1</version>
</dependency>
但是导入以后依然回报下面的错误
java.net.MalformedURLException: unknown protocol: hdfs
at java.net.URL.<init>(URL.java:593)
at java.net.URL.<init>(URL.java:483)
at java.net.URL.<init>(URL.java:432)
at TestHDFS.readFile(TestHDFS.java:17)
于是一次导入相关的依赖,发现在导入下面依赖以后就可以正常跑起来了,下面的依赖包含了上面common的依赖,所以上面的common可以删掉
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
服务器,机架,机房,大数据中心
![](https://img.haomeiwen.com/i10338666/25c7a8ec186a3f8e.png)
API
blade 刀片服务器
rack 机架(一台机架上有6个刀片服务器,并且每一个机架上都有交换机)
一个机房有多个机架,每个机架之间也通过交换机来交换数据
一个数据中心有多个机房
![](https://img.haomeiwen.com/i10338666/fb2dd256627e9b62.png)
hadoop API访问java代码
@Test
public void readFile() throws Exception {
System.out.println("HELLO");
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
URL url = new URL("hdfs://20.18.5.1:8020/user/hadoop/hadoop/test.txt");
URLConnection conn = url.openConnection();
InputStream is = conn.getInputStream();
byte[] buf = new byte[is.available()];
is.read(buf);
is.close();
String str = new String(buf);
System.out.println(str);
}
@Test
public void readFileByURL() throws Exception {
System.out.println("HELLOURL");
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
InputStream in = new URL("hdfs://20.18.5.1:8020/user/hadoop/hadoop/test.txt").openStream();
byte[] buf = new byte[in.available()];
in.read(buf);
in.close();
String str = new String(buf);
System.out.println(str);
}
@Test
public void readFileByAPI() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://20.18.5.1:8020/");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/hadoop/test.txt");
FSDataInputStream fis = fs.open(path);
byte[] buf = new byte[1024];
int len = -1;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while ( (len = fis.read(buf)) != -1){
baos.write(buf,0,len);
}
fis.close();
baos.close();
System.out.println("---------------------------------------");
System.out.println(baos);
System.out.println(new String(baos.toByteArray()));
System.out.println(baos.toByteArray());
System.out.println(baos.toByteArray().toString());
System.out.println("---------------------------------------");
}
@Test
public void readFileByAPI2() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://20.18.5.1:8020/");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/hadoop/test.txt");
FSDataInputStream fis = fs.open(path);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtils.copyBytes(fis,baos,1024,true);
System.out.println(new String(baos.toByteArray()));
}
@Test
public void mkdir() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://20.18.5.1:8020/");
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(new Path("/user/hadoop/myhadoop"));
}
@Test
public void putFile() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", URLStr);
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream fos = fs.create(new Path("/user/hadoop/myhadoop/a.txt"));
fos.write("hello lll".getBytes());
fos.close();
}
@Test
public void removeFile() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", URLStr);
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path("/user/hadoop/myhadoop"),true);
}
@Test
public void aapendFile() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", URLStr);
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream fos = fs.append(new Path("/user/hadoop/myhadoop/a.txt"));
fos.write("hello xxx".getBytes());
fos.close();
}
@Test
public void test1() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FSDataInputStream fis = fs.open(new Path("hdfs://nn1/user/hadoop/hello.txt"));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtils.copyBytes(fis,baos,1024);
baos.close();
fis.close();
System.out.println(new String(baos.toByteArray()));
}
@Test
public void test2() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream fout = fs.create(new Path("hdfs://nn1/user/hadoop/a.txt"));
fout.write("hello boy".getBytes());
fout.close();
}
@Test
public void test3() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream fout = fs.create(new Path("hdfs://nn1/user/hadoop/a.txt"),
true,1024,(short)2,1024);
FileInputStream fios = new FileInputStream("e:/a.txt");
IOUtils.copyBytes(fios,fout,1024);
fout.close();
fios.close();
}
blocksize的配置,在上面代码test3中遇到的问题以及修改
配置hadoop的最小blockSize,必须是512的倍数,因为在hdfs写入过程期间会进行校验,最小进行的单位是512字节进行校验,所以大小必须超过512的大小,且是512的倍数
![](https://img.haomeiwen.com/i10338666/bf4392d301b640a4.png)
需要在节点中的hdfs-site.xml
文件中添加下面属性的值来修改block的size,修改完以后需要重新启动节点start-dfs.sh
<property>
<name>dfs.namenode.fs-limits.min-block-size</name>
<value>1024</value>
</property>
网友评论