欢迎关注我的CSDN: https://blog.csdn.net/bingque6535
一. MapReduce Join
对两份数据data1和data2进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。
如果数据量比较大,在内存进行连接操会发生OOM。mapreduce join可以用来解决大数据的连接。
1. 思路
1. reduce join
在map阶段, 把关键字作为key输出,并在value中标记出数据是来自data1还是data2。因为在shuffle阶段已经自然按key分组,reduce阶段,判断每一个value是来自data1还是data2,在内部分成2组,做集合的乘积。这种方法有2个问题:
- map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。
- reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。
2. map join
两份数据中,如果有一份数据比较小,小数据全部加载到内存,按关键字建立索引。大数据文件作为map的输入文件,对map()函数每一对输入,都能够方便地和已加载到内存的小数据进行连接。把连接结果按key输出,经过shuffle阶段,reduce端得到的就是已经按key分组的,并且连接好了的数据。
这种方法,要使用hadoop中的DistributedCache把小数据分布到各个计算节点,每个map节点都要把小数据库加载到内存,按关键字建立索引。这种方法有明显的局限性:
- 有一份数据比较小,在map端,能够把它加载到内存,并进行join操作。
3. 使用内存服务器,扩大节点的内存空间
针对map join,可以把一份数据存放到专门的内存服务器,在map()方法中,对每一个<key,value>的输入对,根据key到内存服务器中取出数据,进行连接
4. 使用BloomFilter过滤空连接的数据
对其中一份数据在内存中建立BloomFilter,另外一份数据在连接之前,用BloomFilter判断它的key是否存在,如果不存在,那这个记录是空连接,可以忽略。
5. 使用mapreduce专为join设计的包
在mapreduce包里看到有专门为join设计的包,对这些包还没有学习,不知道怎么使用,只是在这里记录下来,作个提醒。
- jar: mapreduce-client-core.jar
- package: org.apache.hadoop.mapreduce.lib.join
二. 代码实现
1. 问题:
-
输入文件格式说明:
-
movies.csv
在这里插入图片描述 -
ratting.csv
在这里插入图片描述
-
-
输出文件格式说明:
在这里插入图片描述
2. 实现reduce join
题目: 将两个文件(movies.csv 和 ratings.csv)中形同电影ID的数据合并成一条数据
-
Driver端
package com.hjf.mr.movie; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-18 14:09 */ public class MovieDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MovieDriver.class); job.setMapperClass(MovieMapper.class); job.setReducerClass(MovieReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inputPath1 = new Path("./Data/movie/movies.csv"); Path inputPath2 = new Path("./Data/movie/ratings.csv"); Path outputPath = new Path("./Data/result"); FileSystem fs = FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath1, inputPath2); FileOutputFormat.setOutputPath(job, outputPath); job.waitForCompletion(true); } }
-
Mapper 端
package com.hjf.mr.movie; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-18 14:12 */ public class MovieMapper extends Mapper<LongWritable, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); StringBuilder sb = new StringBuilder(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取文件信息: 文件路径/文件名:0+XXX FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 获取文件名 String fileName = inputSplit.getPath().getName(); String[] split = value.toString().split(","); sb.setLength(0); /** * movies.csv文件中每条数据的格式: * movieId title genres * 1 Toy Story (1995) Adventure|Animation|Children|Comedy|Fantasy */ if (fileName.equals("movies.csv")) { // movieId 作为key outKey.set(split[0]); // title genres StringBuilder append = sb.append(split[1]).append("\t").append(split[2]); String str = "movies#" + append.toString(); // "movie#title genres" 作为value outValue.set(str); context.write(outKey, outValue); } else if(fileName.equals("ratings.csv")) { /** * ratings.csv文件中每条数据的格式: * userId movieId rating timestamp * 1 1 4 964982703 */ // movieId 作为key outKey.set(split[1]); // userId rating timestamp StringBuilder append = sb.append(split[0]).append("\t").append(split[2]).append("\t").append(split[3]); String str = "ratings#" + append.toString(); // "ratings#userId rating timestamp" 作为value outValue.set(str); context.write(outKey, outValue); } } }
-
Reducer 端
package com.hjf.mr.movie; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author Jiang锋时刻 * @create 2020-05-18 15:20 */ public class MovieReducer extends Reducer<Text, Text, Text, Text> { List<String> moviesList = new ArrayList<>(); List<String> ratingsList = new ArrayList<>(); Text outValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value: values){ // 将value中以"movies#"开头的放在moviesList列表中 if (value.toString().startsWith("movies#")) { String string = value.toString().split("#")[1]; moviesList.add(string); } else if (value.toString().startsWith("ratings#")){ // 将value中以"ratings#"开头的放在ratingsList列表中 String string = value.toString().split("#")[1]; ratingsList.add(string); } } // 获取两个集合的长度 int moviesSize = moviesList.size(); int ratingsSize = ratingsList.size(); // 将相同movieId 的两个表中的数据进行合并 for (int i = 0; i < moviesSize; i++) { for (int j = 0; j < ratingsSize; j++) { outValue.set(moviesList.get(i) + "\t" + ratingsList.get(j)); context.write(key, outValue); } } // 清空列表 moviesList.clear(); ratingsList.clear(); } }
3. 实现map join
题目: 求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)
编写的代码运行不了, 等有时间了再好好分析
欢迎关注我的CSDN: https://blog.csdn.net/bingque6535
网友评论