一、Join操作
Join操作是RDBMS中常见的操作,将多个文件按相同键连接起来。在这里我们使用Mapreduce对两个文件执行Join操作
二、原理
MapReduce实现Join操作有多种方式,在这里我们使用最常见的Reduce端连接:
- Map端将连接字段按不同文件打标签后作为key,其余部分作为value,最后进行输出。
- Reduce端从带有标签的key中提取出连接字段,作为输出key,values对应输出values。
三、示例
1. 文件
data.txt:
201001 1003 abc
201002 1005 def
201003 1006 ghi
201004 1003 jkl
201005 1004 mno
201006 1005 pqr
info.txt
1003 kaka
1004 da
1005 jue
1006 zhao
2. 代码
package Mapreduce_Join;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MR_Join_Comparator extends WritableComparator {
public MR_Join_Comparator() {
// TODO Auto-generated constructor stub
super(TextPair.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
TextPair tp1 = (TextPair)a;
TextPair tp2 = (TextPair)b;
return tp1.getFirst().compareTo(tp2.getFirst());
}
}
package Mapreduce_Join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MR_Join_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextPair, Text>.Context context)
throws IOException, InterruptedException {
String path = ((FileSplit)context.getInputSplit()).getPath().toString();
if (path.contains("data.txt")) {
String values[] = value.toString().split("\t");
if (values.length < 3) {
return;
} else {
TextPair tpPair = new TextPair(new Text(values[1]), new Text("1"));
context.write(tpPair, new Text(values[0]+"\t"+values[2]));
}
}
if (path.contains("info.txt")) {
String values[] = value.toString().split("\t");
if (values.length < 2) {
return;
} else {
TextPair tpPair = new TextPair(new Text(values[0]), new Text("0"));
context.write(tpPair, new Text(values[1]));
}
}
}
}
package Mapreduce_Join;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MR_Join_Partitioner extends Partitioner<TextPair, Text> {
@Override
public int getPartition(TextPair key, Text value, int numPartitions) {
// TODO Auto-generated method stub
return Math.abs(key.getFirst().hashCode()*127)%numPartitions;
}
}
package Mapreduce_Join;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MR_Join_Reduce extends Reducer<TextPair, Text, Text, Text> {
@Override
protected void reduce(TextPair key, Iterable<Text> values, Reducer<TextPair, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
Text pid = key.getFirst();
String desc = values.iterator().next().toString();
while (values.iterator().hasNext()) {
context.write(pid, new Text(values.iterator().next().toString()+"\t"+desc));
}
}
}
package Mapreduce_Join;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
first.write(out);
second.write(out);
}
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
first.readFields(in);
second.readFields(in);
}
public int compareTo(TextPair o) {
// TODO Auto-generated method stub
int cmp = first.compareTo(o.first);
if (cmp!=0) {
return cmp;
}
return second.compareTo(o.second);
}
}
package Mapreduce_Join;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MRJoin {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
// String[] otherArgs = parser.getRemainingArgs();
// if (args.length < 3) {
// System.err.println("Usage: MRJoin <in_path_one> <in_path_two> <output>");
// System.exit(2);
// }
//
Job job = new Job(conf, "MRJoin");
job.setJarByClass(MRJoin.class);
job.setMapperClass(MR_Join_Mapper.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(MR_Join_Partitioner.class);
job.setGroupingComparatorClass(MR_Join_Comparator.class);
job.setReducerClass(MR_Join_Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path inpath1 = new Path("hdfs://localhost:9000/user/abc/data/data.txt");
Path inpath2 = new Path("hdfs://localhost:9000/user/abc/data/info.txt");
Path outpath = new Path("hdfs://localhost:9000/user/abc/data/out");
FileInputFormat.addInputPath(job, inpath1);
FileInputFormat.addInputPath(job, inpath2);
FileOutputFormat.setOutputPath(job, outpath);
System.exit(job.waitForCompletion(true)?0:1);
}
}
Mapreduce程序执行流程:
① 配置
② 创建作业
③ 设置Jar包
④ 设置Mapper
⑤ 设置Partitioner和Comparator
⑥ 设置Reducer
⑦ 设置输入输出路径
⑧ 执行
网友评论