本机开发mapreduce,然后打包上传到hadoop服务器
一、入门wordcount
1、新建maven项目,pom.xml内容:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.luo</groupId>
<artifactId>wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>wordcount</name>
<url>http://maven.apche.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.7</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
</dependency>
</dependencies>
然后创建源码目录:src/main/java
2、编写mapper程序:src/main/java/WordCountMapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//得到输入的每一行数据
String line = value.toString();
//通过空格分隔
String[] words = line.split(" ");
//循环遍历输出
for(String word:words){
context.write(new Text(word),new IntWritable(1));
}
}
}
mapreduce程序,因为需要传输数据,所以数据必须是可序列化的,Hadoop定义了自己的可序列化类型,例如Long类型在hadoop中的可序列化为LongWritable,String对应Text,int对应IntWritable。
mapreduce框架每读到一行数据,就会调用一次这里定义的map方法,map方法接收一个key-value对,继承的Mapper<LongWritable, Text, Text, IntWritable>分别是输入key类型,输入value类型,输出key类型,输出value类型。输入的key默认是mapreduce读到一行文本的起始偏移量,value默认是一行的数据内容。
输出的key-value是用户自己定义的。
3、编写reduce程序:src/main/java/WordCountReducer
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Integer count = 0;
for (IntWritable value:values){
count+=value.get();
}
context.write(key, new IntWritable(count));
}
}
reduce方法继承的Reducer<Text, IntWritable, Text, IntWritable>,分别为输入key类型,输入value类型,输出key类型,输出value类型,但是reduce方法接收的value是一个可迭代的数据集,因为reduce任务读取到的map任务处理结果例如是(good,1)(good,1)(good,1)当传给reduce方法时,会经过一个排序和合并的过程,合并完会变成,key:good,value:(1,1,1)
4、编写主程序:src/main/java/WordCountMapReduce
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountMapReduce {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建配置对象
Configuration conf = new Configuration();
//创建job
Job job = Job.getInstance(conf, "wordcount");
//设置运行的job类
job.setJarByClass(WordCountMapReduce.class);
//设置mapper类
job.setMapperClass(WordCountMapper.class);
//设置reduce类
job.setReducerClass(WordCountReducer.class);
//设置map输出的key,value
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置reduce输出的key,value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
boolean b = job.waitForCompletion(true);
if(!b){
System.out.println("wordcount task fail!");
}
}
}
然后打包上传运行一下,成功~
网友评论