美文网首页大数据学习
大数据学习之Hadoop——10MapReduce实现Reduc

大数据学习之Hadoop——10MapReduce实现Reduc

作者: Jiang锋时刻 | 来源:发表于2020-05-30 02:12 被阅读0次

    欢迎关注我的CSDN: https://blog.csdn.net/bingque6535

    一. MapReduce Join

    对两份数据data1和data2进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。
    如果数据量比较大,在内存进行连接操会发生OOM。mapreduce join可以用来解决大数据的连接。

    1. 思路

    1. reduce join

    在map阶段, 把关键字作为key输出,并在value中标记出数据是来自data1还是data2。因为在shuffle阶段已经自然按key分组,reduce阶段,判断每一个value是来自data1还是data2,在内部分成2组,做集合的乘积。这种方法有2个问题:

    1. map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。
    2. reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。
    2. map join

    两份数据中,如果有一份数据比较小,小数据全部加载到内存,按关键字建立索引。大数据文件作为map的输入文件,对map()函数每一对输入,都能够方便地和已加载到内存的小数据进行连接。把连接结果按key输出,经过shuffle阶段,reduce端得到的就是已经按key分组的,并且连接好了的数据。

    这种方法,要使用hadoop中的DistributedCache把小数据分布到各个计算节点,每个map节点都要把小数据库加载到内存,按关键字建立索引。这种方法有明显的局限性:

    1. 有一份数据比较小,在map端,能够把它加载到内存,并进行join操作。

    3. 使用内存服务器,扩大节点的内存空间

    针对map join,可以把一份数据存放到专门的内存服务器,在map()方法中,对每一个<key,value>的输入对,根据key到内存服务器中取出数据,进行连接

    4. 使用BloomFilter过滤空连接的数据

    对其中一份数据在内存中建立BloomFilter,另外一份数据在连接之前,用BloomFilter判断它的key是否存在,如果不存在,那这个记录是空连接,可以忽略。

    5. 使用mapreduce专为join设计的包

    在mapreduce包里看到有专门为join设计的包,对这些包还没有学习,不知道怎么使用,只是在这里记录下来,作个提醒。

    1. jar: mapreduce-client-core.jar
    2. package: org.apache.hadoop.mapreduce.lib.join

    二. 代码实现

    1. 问题:

    1. 输入文件格式说明:

      1. movies.csv


        在这里插入图片描述
      2. ratting.csv


        在这里插入图片描述
    2. 输出文件格式说明:


      在这里插入图片描述

    2. 实现reduce join

    题目: 将两个文件(movies.csv 和 ratings.csv)中形同电影ID的数据合并成一条数据

    1. Driver端

      package com.hjf.mr.movie;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      
      import java.io.IOException;
      
      /**
       * @author Jiang锋时刻
       * @create 2020-05-18 14:09
       */
      public class MovieDriver {
          public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
              Configuration conf = new Configuration();
              Job job = Job.getInstance(conf);
              job.setJarByClass(MovieDriver.class);
      
              job.setMapperClass(MovieMapper.class);
              job.setReducerClass(MovieReducer.class);
      
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(Text.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(Text.class);
      
              Path inputPath1 = new Path("./Data/movie/movies.csv");
              Path inputPath2 = new Path("./Data/movie/ratings.csv");
              Path outputPath = new Path("./Data/result");
      
              FileSystem fs = FileSystem.get(conf);
              if (fs.exists(outputPath)) {
                  fs.delete(outputPath, true);
              }
      
              FileInputFormat.setInputPaths(job, inputPath1, inputPath2);
              FileOutputFormat.setOutputPath(job, outputPath);
      
              job.waitForCompletion(true);
              
          }
      }
      
      
    2. Mapper 端

      package com.hjf.mr.movie;
      
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.lib.input.FileSplit;
      
      import java.io.IOException;
      
      /**
       * @author Jiang锋时刻
       * @create 2020-05-18 14:12
       */
      public class MovieMapper extends Mapper<LongWritable, Text, Text, Text> {
          Text outKey = new Text();
          Text outValue = new Text();
          StringBuilder sb = new StringBuilder();
      
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              // 获取文件信息: 文件路径/文件名:0+XXX
              FileSplit inputSplit = (FileSplit) context.getInputSplit();
              // 获取文件名
              String fileName = inputSplit.getPath().getName();
              String[] split = value.toString().split(",");
              sb.setLength(0);
      
              /**
               * movies.csv文件中每条数据的格式:
               *  movieId     title               genres
               *  1           Toy Story (1995)    Adventure|Animation|Children|Comedy|Fantasy
               */
              if (fileName.equals("movies.csv")) {
                  // movieId 作为key
                  outKey.set(split[0]);
      
                  // title    genres
                  StringBuilder append = sb.append(split[1]).append("\t").append(split[2]);
                  String str = "movies#" + append.toString();
                  // "movie#title  genres" 作为value
                  outValue.set(str);
      
                  context.write(outKey, outValue);
      
              } else if(fileName.equals("ratings.csv")) {
                  /**
                   * ratings.csv文件中每条数据的格式:
                   *  userId  movieId     rating  timestamp
                   *  1       1           4       964982703
                   */
                  // movieId 作为key
                  outKey.set(split[1]);
                  // userId   rating  timestamp
                  StringBuilder append = sb.append(split[0]).append("\t").append(split[2]).append("\t").append(split[3]);
                  String str = "ratings#" + append.toString();
                  // "ratings#userId  rating  timestamp" 作为value
                  outValue.set(str);
      
                  context.write(outKey, outValue);
              }           
          }
      }
      
      
    3. Reducer 端

      package com.hjf.mr.movie;
      
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Reducer;
      
      import java.io.IOException;
      import java.util.ArrayList;
      import java.util.List;
      
      /**
       * @author Jiang锋时刻
       * @create 2020-05-18 15:20
       */
      public class MovieReducer extends Reducer<Text, Text, Text, Text> {
          List<String> moviesList = new ArrayList<>();
          List<String> ratingsList = new ArrayList<>();
          Text outValue = new Text();
      
          @Override
          protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
              for (Text value: values){
                  // 将value中以"movies#"开头的放在moviesList列表中
                  if (value.toString().startsWith("movies#")) {
                      String string = value.toString().split("#")[1];
                      moviesList.add(string);
                  } else if (value.toString().startsWith("ratings#")){
                      // 将value中以"ratings#"开头的放在ratingsList列表中
                      String string = value.toString().split("#")[1];
                      ratingsList.add(string);
                  }
              }
      
              // 获取两个集合的长度
              int moviesSize = moviesList.size();
              int ratingsSize = ratingsList.size();
              // 将相同movieId 的两个表中的数据进行合并
              for (int i = 0; i < moviesSize; i++) {
                  for (int j = 0; j < ratingsSize; j++) {
                      outValue.set(moviesList.get(i) + "\t" + ratingsList.get(j));
                      context.write(key, outValue);
                  }
              }
              // 清空列表
              moviesList.clear();
              ratingsList.clear();
          }
      }
      
      

    3. 实现map join

    题目: 求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)

    编写的代码运行不了, 等有时间了再好好分析

    欢迎关注我的CSDN: https://blog.csdn.net/bingque6535

    相关文章

      网友评论

        本文标题:大数据学习之Hadoop——10MapReduce实现Reduc

        本文链接:https://www.haomeiwen.com/subject/jwgqzhtx.html