创建一个maven工程
pom.xml引入如下依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
复制hadoop环境下面的配置文件到resources目录
复制四个配置文件
编写HDFS客户端
package com.zhanghh.train;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.*;
/**
* Hello world!
*
*/
public class HdfsTest {
Configuration conf;
FileSystem fs;
@Before
public void before() throws Exception {
//加载配置,默认会加载classpath下面的四个配置
conf = new Configuration(true);
//根据配置获取FileSystem API
fs = FileSystem.get(conf);
}
@After
public void after() throws Exception {
fs.close();
}
/**
* 创建目录
* @throws Exception
*/
@Test
public void mkdir() throws Exception {
Path path = new Path("/tmp");
if (fs.exists(path)){
fs.delete(path,true);
}
fs.mkdirs(path);
}
/**
* 上传本地文件到HDFS指定文件
* @throws Exception
*/
@Test
public void uploadFile() throws Exception {
Path path = new Path("/tmp/test.txt");
if(fs.exists(path)){
fs.delete(path,false);
}
FSDataOutputStream outputStream = fs.create(path);
File file = new File("C:\\Users\\zhanghh\\Desktop\\test.txt");
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
IOUtils.copyBytes(inputStream,outputStream,conf,true);
}
/**
* 下载HDFS上的文件
* @throws Exception
*/
@Test
public void downFile() throws Exception {
Path path = new Path("/tmp/test.txt");
FSDataInputStream inputStream = fs.open(path);
BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream(new File("C:\\Users\\zhanghh\\Desktop\\222.txt")));
IOUtils.copyBytes(inputStream,outputStream,conf,true);
}
/**
* 查看文件存放的block位置信息
* @throws Exception
*/
@Test
public void blockLoction() throws Exception {
Path path = new Path("/tmp/test.txt");
FileStatus fss=fs.getFileStatus(path);
BlockLocation[] locations = fs.getFileBlockLocations(fss, 0, fss.getLen());
for (BlockLocation obj:locations) {
System.out.println(obj);
}
}
/**
* 删除文件
* @throws Exception
*/
@Test
public void delete() throws Exception{
Path path = new Path("/tmp");
if (fs.exists(path)){
fs.delete(path,true);
}
}
}
mapperReduce编写一个wordCount简单计算
package com.zhanghh.train;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* <pre>
* MR测试类
* </pre>
*
* @author zhanghh
* @create 2019/8/25
*/
public class MapperReduceTest {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(true);
Job job = Job.getInstance(conf);
job.setJarByClass(MyWordCount.class); //设置程序入口
job.setJobName("zhanghh_job"); //任务名称
Path inputPath = new Path("/tmp/test.txt");
FileInputFormat.addInputPath(job, inputPath); //输入路径
Path outputPath = new Path("/tmp/output");
if (outputPath.getFileSystem(conf).exists(outputPath)) {
outputPath.getFileSystem(conf).delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath); //输出路径
job.setMapperClass(MyWordCount.MyMapper.class); //mapper类
job.setMapOutputKeyClass(Text.class); //mapper输出的key类型
job.setMapOutputValueClass(IntWritable.class); //mapper输出的value类型
job.setReducerClass(MyWordCount.MyReducer.class);//reducer类
// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true); //提交任务,等待job完成
}
}
----------------------------------------------------------------------------------------------------------
package com.zhanghh.train;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.StringTokenizer;
/**
* <pre>
* 单词统计
* </pre>
*
* @author zhanghh
* @create 2019/8/27
*/
public class MyWordCount {
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
private IntWritable one=new IntWritable(1);
private Text word = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word,one);
}
}
}
public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val:values) {
sum +=val.get();
}
result.set(sum);
context.write(key,result);
}
}
}
网友评论