reduce端的Join
package com.wenhuan.reducejoin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class ReduceJoin {
static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
/*
*
* map端做端事情:发送数据的时候需要打标记
*
* */
//由于map中需要知道数据来源,所以最好在进入map函数之前可以获取文件的名字
String filename = "";
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获取文件切片
InputSplit inputsplit = context.getInputSplit();
//从文件切片获取实现类
FileSplit fs = (FileSplit)inputsplit;
filename = fs.getPath().getName();
}
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//解析出来每一行内容,打标记
String[] infos = value.toString().split("\t");
if(filename.equals("orders")) {
k.set(infos[0]);
v.set("OR"+infos[1] + "\t" + infos[2] + "\t" + infos[3]);
context.write(k, v);
}else {
//
k.set(infos[0]);
v.set("PR" + infos[1] + "\t" + infos[2]);
context.write(k, v);
}
}
}
static class MyReducer extends Reducer<Text,Text,Text,NullWritable>{
Text k = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
/*
* 关联关系:一对多,一个商品对应多个订单
* */
//创建两个集合
List<String> orderList = new ArrayList<String>();
List<String> proList = new ArrayList<String>();
for(Text v:values) {
String vv = v.toString();
if(vv.startsWith("OR")) {
orderList.add(vv.substring(2));
}else {
proList.add(vv.substring(2));
}
}
if(orderList.size() > 0 && proList.size() > 0) {
for(String ol:orderList) {
for(String pl:proList){
String res = key.toString() + "\t" + ol + "\t" + pl;
k.set(res);
context.write(k, NullWritable.get());
}
}
}
}
}
。。。
}
map端的Join
相比reduce端的Join,我们可以考虑将小表的数据加载到每个运行maptask的内存中。我们每次在map端只需要读大表,当读取到大表中的每行数据,可以直接和内存中的小表进行关联,这样仅仅在map阶段就可以完成join的操作。
在驱动程序中可以通过job.addCacheFile(url)将指定URL的文件加载到缓存中。
这样在mapper的setup中就可以获取小表。
package com.wenhuan.mapjoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Mapjoin {
static class MyMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
//setup读取缓存中的数据,封装到容器中
Map<String,String> map = new HashMap<String,String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获取缓存中的数据路径
Path path = context.getLocalCacheFiles()[0];
String p = path.toString();
BufferedReader br = new BufferedReader(new FileReader(p));
String line = null;
while((line = br.readLine()) != null) {
String[] infos = line.split("\t");
map.put(infos[0],infos[1]+"\t"+infos[2]);
}
}
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//解析出来每一行内容,打标记
String[] infos = value.toString().split("\t");
String pid = infos[2];
//关联
if(map.containsKey(pid)) {
String res = value.toString() + map.get(pid);
k.set(res);
context.write(k, NullWritable.get());
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//加载配置文件
Configuration conf = new Configuration();
//启动一个Job,封装maper和reducer
Job job = Job.getInstance(conf);
//设置计算程序的主驱动类,运行的时候打成jar包运行。
job.setJarByClass(Mapjoin.class);
//设置Maper
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0);
//这里指最终的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//将指定小表加载到缓冲中
job.addCacheFile(new URI("/info/product"));
//设置输入路径和输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交,需要打印日志
job.waitForCompletion(true);
}
}
由于Mapjoin不需要Reduce阶段,必须设置NumReduceTasks为0。
网友评论