1、hdfs java demo
首先是添加相关的依赖包
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.luozheng.hadoop</groupId>
<artifactId>Hadoop-Hdfs-01</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<hadoop.version>3.0.1</hadoop.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.codehaus.woodstox/woodstox-core-lgpl -->
<dependency>
<groupId>org.codehaus.woodstox</groupId>
<artifactId>woodstox-core-lgpl</artifactId>
<version>4.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-collections/commons-collections -->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>2.2</version>
</dependency>
</dependencies>
</project>
为啥会有这么多依赖,我也不知道,没找到官方配置,反正都是报错后一个个加的。
测试程序:
package org.luozheng.hadoop;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
public class FileSystemApp {
public static void main(String[] args) {
try {
// 1、获取hadoop文件系统操作对象
Configuration config = new Configuration();
String fsRootUrl = config.get("fs.defaultFS");
System.out.println("fs.defaultFS = " + fsRootUrl);
FileSystem fileSystem = FileSystem.newInstance(config);
System.out.println("" + fileSystem);
// 2、把本地文件上传到文件系统
String localFilePath = "/home/luozheng/eclipseworkspace/Hadoop-Hdfs-01/src/main/java/org/luozheng/hadoop/FileSystemApp.java";
FileInputStream inStream = new FileInputStream(localFilePath);
String dstUrl = fsRootUrl + "/user/luozheng/hadooptest/FileSystemApp.java";
Path dstPath = new Path(dstUrl);
FSDataOutputStream outStream = fileSystem.create(dstPath, new Progressable() {
public void progress() {
//System.out.println(".");
}
});
// IOUtils.copyBytes(inStream, outStream, config);
IOUtils.copyBytes(inStream, outStream, 4096, true);
System.err.println("file upload successful !!!");
// 3、文件下载
FSDataInputStream dstInstream = fileSystem.open(dstPath);
IOUtils.copyBytes(dstInstream, System.out, config, true);
System.err.println("file download successful !!!");
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
}
2、mapreducer java demo
package org.luozheng.hadoop;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
System.out.println("[luozheng] map key=" + key.get() + ",value=" + value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
// 轉換成另外的一個map,其key爲字符,value为字符出现的次数
context.write(word, one);
}
}
}
package org.luozheng.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
System.out.println("[luozheng] reduce key=" + key.toString() + ",value=" + values);
int sum = 0;
for(IntWritable value : values) {
sum += value.get();
}
result.set(sum);
context.write(key, result);
}
}
package org.luozheng.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
/**
* input -> map -> reducer -> output
* @author root
*/
@SuppressWarnings("rawtypes")
public class AbstractMapReducer extends Configured implements Tool{
private Class mapperClazz = null;
private Class mapOutKeyClazz = null;
private Class mapOutValueClazz = null;
private Class reducerClazz = null;
/**
* AbstractMapReducer构造函数
* @param mapperClazz MapperClass
* @param mapOutKeyClazz MapOutputKeyClass
* @param mapOutValueClazz MapOutputValueClass
* @param reducerClazz ReducerClass
*/
public AbstractMapReducer(Class mapperClazz, Class mapOutKeyClazz, Class mapOutValueClazz, Class reducerClazz) {
setConf(new Configuration());
this.mapperClazz = mapperClazz;
this.mapOutKeyClazz = mapOutKeyClazz;
this.mapOutValueClazz = mapOutValueClazz;
this.reducerClazz = reducerClazz;
}
@SuppressWarnings({ "unchecked"})
public int run(String[] args) throws Exception {
if(args.length < 2) {
System.out.println("[luozheng] please input handle data file and result file!");
return -1;
}
// 第一步,创建JobConf
JobConf jobConf = new JobConf(getConf());
// jobConf.setJarByClass(this.getClass());
// // 第二步,设置mapper
// jobConf.setMapperClass(mapperClazz);
// jobConf.setMapOutputKeyClass(mapOutKeyClazz);
// jobConf.setMapOutputValueClass(mapOutValueClazz);
// // 第三步,设置reducer
// jobConf.setReducerClass(reducerClazz);
// 第四步,基于JobConf创建一个Job
Job job = Job.getInstance(jobConf, this.getClass().getName());
job.setJarByClass(this.getClass());
// 第二步,设置mapper
job.setMapperClass(mapperClazz);
job.setMapOutputKeyClass(mapOutKeyClazz);
job.setMapOutputValueClass(mapOutValueClazz);
// 第三步,设置reducer
job.setReducerClass(reducerClazz);
// 第一个参数为要处理的数据文件名
FileInputFormat.addInputPath(job, new Path(args[0]));
// 第二个参数为结果放置的路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 参数true的意思是是否输出处理的日志
return (job.waitForCompletion(true)?0:-1);
}
}
package org.luozheng.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.ToolRunner;
public class WordCountMapReducer extends AbstractMapReducer{
public WordCountMapReducer() {
super(WordCountMapper.class, Text.class, IntWritable.class, WordCountReducer.class);
System.out.println("====>" + WordCountMapper.class.isAssignableFrom(Mapper.class));
}
public static void main(String[] args) throws Exception {
WordCountMapReducer mapReducer = new WordCountMapReducer();
String hdfsRootUrl = mapReducer.getConf().get("fs.defaultFS");
System.out.println(hdfsRootUrl);
args = new String[] {
hdfsRootUrl + "/user/luozheng/hadooptest/test",
hdfsRootUrl + "/user/luozheng/hadooptest/test-luozheng-output"
};
Configuration configuration = new Configuration();
int result = ToolRunner.run(configuration, mapReducer, args);
System.out.println("[luozheng] result = " + result);
}
}
网友评论