11.mapreduce的join
1. reduce join
(1)、需求
订单数据表t_order:
id | date | pid | amount |
---|---|---|---|
1001 | 20150710 | P0001 | 2 |
1002 | 20150710 | P0002 | 3 |
1002 | 20150710 | P0003 | 3 |
商品信息表t_product
id | pname | category_id | price |
---|---|---|---|
P0001 | 小米5 | 1000 | 2000 |
P0002 | 锤子T1 | 1000 | 3000 |
假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算:
select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id
(2)、实现机制
- 通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联
- 定义Mapper:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//现在我们读取了两个文件,如何确定当前处理的这一行数据是来自哪一个文件里面的
//方式一:通过获取文件的切片,获得文件明
/*
FileSplit inputSplit = (FileSplit) context.getInputSplit();//获取我们输入的文件的切片
//获取文件名称
String name = inputSplit.getPath().getName();
if (name.equals("orders.txt")) {
//订单表数据
} else {
//商品表数据
}
*/
String[] split = value.toString().split(",");
//方式二:因为t_product表,都是以p开头,所以可以作为判断的依据
if (value.toString().startsWith("p")) {
//p0002,锤子T1,1000,3000
//以商品id作为key2,相同商品的数据都会到一起去
context.write(new Text(split[0]), value);
} else {
//order
// 1001,20150710,p0001,2
context.write(new Text(split[2]), value);
}
}
}
- 定义Reducer:
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ReduceJoinReducer extends Reducer<Text,Text,Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String order = "";
String product = "";
for (Text value : values) {
if (value.toString().startsWith("p")) {
product = value.toString();
}else {
order = value.toString();
}
}
// 将结果保存
context.write(new Text(order + "\t" + product),NullWritable.get());
}
}
- 定义main方法
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ReduceJoinMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//获取job对象
Job job = Job.getInstance(super.getConf(), ReduceJoinMain.class.getSimpleName());
job.setJarByClass(ReduceJoinMain.class);
//第一步:读取文件
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
//第二步:设置自定义mapper逻辑
job.setMapperClass(ReduceJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//分区,排序,规约,分组 省略
//第七步:设置reduce逻辑
job.setReducerClass(ReduceJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//第八步:设置输出数据路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new ReduceJoinMain(), args);
System.exit(run);
}
}
2. map join
(1)、原理阐述
-
适用于关联表中有小表的情形;
-
可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度
-
将小文件上传到hdfs中,然后在Main函数中将文件添加到缓存中,然后使用java io的方式读取文件;
在Map()函数中将要实现join的字段放在key中,从而实现两个文件之间的join。
(2)、实现示例
- 先在mapper类中预先定义好小表,进行join
- 引入实际场景中的解决方案:一次加载数据库或者用
- 定义mapper类:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
//用于保存商品表的数据;productMap中的key是商品id,value是与key对应的表记录
private Map<String, String> productMap;
/**
* 初始化方法,只在程序启动调用一次
*
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
productMap = new HashMap<String, String>();
// 通过context上下文对象去 获取配置文件对象
Configuration configuration = context.getConfiguration();
// 通过Job对象去 获取到所有的缓存文件
// 方式一
URI[] cacheFiles = Job.getInstance(context.getConfiguration()).getCacheFiles();
//方式二:deprecated
//URI[] cacheFiles = DistributedCache.getCacheFiles(configuration);
//现在只有一个缓存文件放进了分布式缓存中
URI cacheFile = cacheFiles[0];
//获取FileSystem
FileSystem fileSystem = FileSystem.get(cacheFile, configuration);
//读取文件,获取到输入流。这里面装的都是商品表的数据
FSDataInputStream fsDataInputStream = fileSystem.open(new Path(cacheFile));
/**
* 商品表数据如下:
* p0001,xiaomi,1000,2
* p0002,appale,1000,3
* p0003,samsung,1000,4
*/
//获取到BufferedReader之后,可以一行一行的读取数据
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream));
String line = null;
//每次循环,获得表的一行数据
while ((line = bufferedReader.readLine()) != null) {
String[] split = line.split(",");
// productMap中的key是商品id,value是与key对应的表记录
productMap.put(split[0], line);
}
}
/**
* @param key
* @param value 订单表的记录,如 1001,20150710,p0001,2
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
//获取订单表的商品id
String pid = split[2];
//获取商品表的数据
String productLine = productMap.get(pid);
System.out.println("Value中的数据:" + value.toString());
System.out.println("商品表中的数据:" + productLine);
// 此时把所有的数据直接拼接在一起了,然后便于观察最后的实际拼接的结果;在实际条件下其实也可以使用这种方式,然后在测试的没有问题之后的,可以再把这个删除
context.write(new Text(value.toString() + "\t" + productLine), NullWritable.get());
}
}
- 定义main方法:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
/**
* 需求:同reduce join
* 实现:select a.id, a.date, b.name, b.category_id, b.price
* from t_order a
* join t_product b
* on a.pid = b.id
*/
public class MapJoinMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//分布式缓存的hdfs路径
URI uri = new URI("hdfs://node01:8020/cache/product.txt");
// 本地路径:需要用一下形式file:///C:/1、HK/.../pdts.txt ;需要指定到具体的文件;如果是使用file:///c:\\1、HK\\3、ME...不符合语法会报错
// URI uri = new URI("file:///C:/1、HK/3、ME/2、高级0x/1、Hadoop集群升级课件/9、MapReduce/MR第一次/12、join操作/map端join/cache/pdts.txt");
Configuration configuration = super.getConf();
// 添加缓存文件 方式二:deprecated
// DistributedCache.addCacheFile(uri, configuration);
//获取job对象
Job job = Job.getInstance(configuration, MapJoinMain.class.getSimpleName());
//添加缓存文件:方式一
job.addCacheFile(uri);
job.setJarByClass(MapJoinMain.class);
//读取文件,解析成为key,value对
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//没有reducer逻辑,不用设置了
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(2);
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new MapJoinMain(), args);
System.exit(run);
}
}
网友评论