package mr;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyGL {
private static class MyGLMapper extends Mapper<LongWritable, Text, Text, Text>{
/*输入类型是LongWritable,Text(上下文);
输出类型是Text,Text(也就是Reduce的输入类型)*/
public void map(LongWritable k1, Text v1, Context context)
throws java.io.IOException, java.lang.InterruptedException
//map()函数是固定模式的,三个参数
{
// 1 2
String[] lines= v1.toString().split("\t");
// \t 在同一个缓冲区内横向跳8个空格(Tab键);split()方法用于把一个字符串分割
//成字符串数组;v1指的是一行,把第一行的两个单词存进lines
if(lines.length!=2 || lines[0].trim().equals("child"))
return; //child parent
String word1=lines[0].trim(); //tom ->去掉 lines[0]里面的空格符
String word2=lines[1].trim(); //lucy
context.write(new Text(word1), new Text("1"+","+word1+","+word2));
//第一个Text对应Mapper的第三个Text;第二个Text对应Mapper的第四个Text
context.write(new Text(word2), new Text("2"+","+word1+","+word2));
//tom,1+tom+lucy
System.out.println("map......"+word1+"-"+word2);
}
}
private static class MyGLReduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values, Context context)
throws java.io.IOException, java.lang.InterruptedException
//context:上下文对象,在整个wordcount运算生命周期内存活
{
List<String> grandch=new ArrayList(); //泛型
List<String> grandpa=new ArrayList();
/* lucy 2+tom+lucy
lucy 1+lucy+mary
2->split[1] tom 2的话取1
1->split[2] mary 1的话取2
k3=tom v3=mary 把这两个放在上下文
*/
Iterator<Text> it=values.iterator();
// Iterator<Text>--输进来的第二个值
while(it.hasNext()){
String lines= it.next().toString();
//2,tom,lucy(对应MyGLMapper的context.write())
String [] words=lines.split(",");
//劈开 string 数组 ["2","tom","lucy"]
if(words[0].equals("1")){
grandpa.add(words[2]);
}else if(words[0].equals("2")){
grandch.add(words[1]);
}
else
return;
}
for(String ch:grandch)
for(String pa:grandpa)
context.write(new Text(ch), new Text(pa));
System.out.println("reduce......");
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException{
}
}
private static String INPUT_PATH="hdfs://master:9000/input/gl.dat";
private static String OUTPUT_PATH="hdfs://master:9000/output/c/";
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
if(fs.exists(new Path(OUTPUT_PATH)))
fs.delete(new Path(OUTPUT_PATH));
Job job=new Job(conf,"myjob");
job.setJarByClass(MyGL.class);
job.setMapperClass(MyGLMapper.class);
job.setReducerClass(MyGLReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//job.setCombinerClass(MyReduce.class);
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
}
}
image.png
当line[0]=1,line[1]=child;
当line[0]=2,line[2]=grandpa;
测试数据:
child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma
网友评论