美文网首页
8_大数据之MapReduce_3

8_大数据之MapReduce_3

作者: 十丈_红尘 | 来源:发表于2019-06-24 21:28 被阅读0次

    Join多种应用

    1️⃣Reduce Join

    2️⃣Reduce Join案例实操
    1.需求
    //order.txt
    1001   01  1
    1002   02  2
    1003   03  3
    1004   01  4
    1005   02  5
    1006   03  6
    
    //pd.txt
    01 小米
    02 华为
    03 格力
    
    将商品信息表中数据根据商品pid合并到订单数据表中。 2.需求分析
     通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联,如下图所示 3.代码实现
    1)创建商品和订合并后的Bean
    package com.xxx.reducejoin;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class OrderBean implements WritableComparable<OrderBean> {
       private String id;
       private String pid;
       private int amount;
       private String pname;
    
       @Override
       public String toString() {
           return id + "\t" + pname + "\t" + amount;
       }
    
       public String getId() {
           return id;
       }
    
       public void setId(String id) {
           this.id = id;
       }
    
       public String getPid() {
           return pid;
       }
    
       public void setPid(String pid) {
           this.pid = pid;
       }
    
       public int getAmount() {
           return amount;
       }
    
       public void setAmount(int amount) {
           this.amount = amount;
       }
    
       public String getPname() {
           return pname;
       }
    
       public void setPname(String pname) {
           this.pname = pname;
       }
    
       //按照Pid分组,组内按照pname排序,有pname的在前
       @Override
       public int compareTo(OrderBean o) {
           int compare = this.pid.compareTo(o.pid);
           if (compare == 0) {
               return o.getPname().compareTo(this.getPname());
           } else {
               return compare;
           }
       }
    
       @Override
       public void write(DataOutput out) throws IOException {
           out.writeUTF(id);
           out.writeUTF(pid);
           out.writeInt(amount);
           out.writeUTF(pname);
       }
    
       @Override
       public void readFields(DataInput in) throws IOException {
           id = in.readUTF();
           pid = in.readUTF();
           amount = in.readInt();
           pname = in.readUTF();
       }
    }
    

    2)编写TableMapper

    package com.xxx.reducejoin;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
    
       private String filename;
    
       private OrderBean order = new OrderBean();
    
       @Override
       protected void setup(Context context) throws IOException, InterruptedException {
           
           //获取切片文件名
           FileSplit fs = (FileSplit) context.getInputSplit();
           filename = fs.getPath().getName();
       }
    
       @Override
       protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
           String[] fields = value.toString().split("\t");
           
           //对不同数据来源分开处理
           if ("order.txt".equals(filename)) {
               order.setId(fields[0]);
               order.setPid(fields[1]);
               order.setAmount(Integer.parseInt(fields[2]));
               order.setPname("");
           } else {
               order.setPid(fields[0]);
               order.setPname(fields[1]);
               order.setAmount(0);
               order.setId("");
           }
    
           context.write(order, NullWritable.get());
       }
    }
    

    3)编写TableReducer

    package com.xxx.reducejoin;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
    
       @Override
       protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
           
           //第一条数据来自pd,之后全部来自order
           Iterator<NullWritable> iterator = values.iterator();
           
           //通过第一条数据获取pname
           iterator.next();
           String pname = key.getPname();
           
           //遍历剩下的数据,替换并写出
           while (iterator.hasNext()) {
               iterator.next();
               key.setPname(pname);
               context.write(key,NullWritable.get());
           }
       }
    
    
    }
    

    4)编写TableDriver

    package com.xxx.reducejoin;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class OrderDriver {
       public static void main(String[] args) throws IOException, ClassNotFoundException, >InterruptedException {
           Job job = Job.getInstance(new Configuration());
           job.setJarByClass(OrderDriver.class);
    
           job.setMapperClass(OrderMapper.class);
           job.setReducerClass(OrderReducer.class);
           job.setGroupingComparatorClass(OrderComparator.class);
    
           job.setMapOutputKeyClass(OrderBean.class);
           job.setMapOutputValueClass(NullWritable.class);
    
           job.setOutputKeyClass(OrderBean.class);
           job.setOutputValueClass(NullWritable.class);
    
           FileInputFormat.setInputPaths(job, new Path("d:\\input"));
           FileOutputFormat.setOutputPath(job, new Path("d:\\output"));
    
           boolean b = job.waitForCompletion(true);
    
           System.exit(b ? 0 : 1);
    
       }
    }
    

    4.测试

    1001    小米  1   
    1001    小米  1   
    1002    华为  2   
    1002    华为  2   
    1003    格力  3   
    1003    格力  3
    

    5.总结

    3️⃣Map Join
    1. 使用场景 : Map Join适用于一张表十分小,一张表很大的场景;
    2. 优点 :
      思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
      Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

    3.具体办法:采用DistributedCache
    (1)在Mappersetup阶段,将文件读取到缓存集合中。
    (2)在驱动函数中加载缓存。
      // 缓存普通文件到Task运行节点。
      job.addCacheFile(new URI("file://e:/cache/pd.txt"));
    4️⃣Map Join案例实操

    1. 需求
    // order.txt
    1001   01  1
    1002   02  2
    1003   03  3
    1004   01  4
    1005   02  5
    1006   03  6
    
    // pd.txt
    01 小米
    02 华为
    03 格力
    

    将商品信息表中数据根据商品pid合并到订单数据表中。

    // 最终数据形式
    id     pname   amount
    1001   小米        1
    1004   小米        4
    1002   华为        2
    1005   华为        5
    1003   格力        3
    1006   格力        6
    

    2.需求分析 : MapJoin适用于关联表中有小表的情形。

    3.实现代码
    (1)先在驱动模块中添加缓存文件
    package test;
    import java.net.URI;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class DistributedCacheDriver {
    
      public static void main(String[] args) throws Exception {
          
    // 0 根据自己电脑路径重新配置
    args = new String[]{"e:/input/inputtable2", "e:/output1"};
    
    // 1 获取job信息
          Configuration configuration = new Configuration();
          Job job = Job.getInstance(configuration);
    
          // 2 设置加载jar包路径
          job.setJarByClass(DistributedCacheDriver.class);
    
          // 3 关联map
          job.setMapperClass(DistributedCacheMapper.class);
          
    // 4 设置最终输出数据类型
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(NullWritable.class);
    
          // 5 设置输入输出路径
          FileInputFormat.setInputPaths(job, new Path(args[0]));
          FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
          // 6 加载缓存数据
          job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt"));
          
          // 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
          job.setNumReduceTasks(0);
    
          // 8 提交
          boolean result = job.waitForCompletion(true);
          System.exit(result ? 0 : 1);
      }
    }
    

    (2)读取缓存的文件数据

    package com.xxx.mapjoin;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;
    
    public class MjMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    
       //pd表在内存中的缓存
       private Map<String, String> pMap = new HashMap<>();
    
       private Text line = new Text();
    
       //任务开始前将pd数据缓存进PMap
       @Override
       protected void setup(Context context) throws IOException, InterruptedException {
           
           //从缓存文件中找到pd.txt
           URI[] cacheFiles = context.getCacheFiles();
           Path path = new Path(cacheFiles[0]);
    
           //获取文件系统并开流
           FileSystem fileSystem = FileSystem.get(context.getConfiguration());
           FSDataInputStream fsDataInputStream = fileSystem.open(path);
    
           //通过包装流转换为reader
           BufferedReader bufferedReader = new BufferedReader(
                   new InputStreamReader(fsDataInputStream, "utf-8"));
    
           //逐行读取,按行处理
           String line;
           while (StringUtils.isNotEmpty(line = bufferedReader.readLine())) {
               String[] fields = line.split("\t");
               pMap.put(fields[0], fields[1]);
           }
    
           //关流
           IOUtils.closeStream(bufferedReader);
    
       }
    
       @Override
       protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
           String[] fields = value.toString().split("\t");
    
           String pname = pMap.get(fields[1]);
    
           line.set(fields[0] + "\t" + pname + "\t" + fields[2]);
    
           context.write(line, NullWritable.get());
    
       }
    }
    

    二 计数器应用


    三 数据清洗(ETL)

      在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
    ① 数据清洗案例实操-简单解析版

    1. 需求 : 去除日志中字段长度小于等于11的日志;
      (1)输入数据
    //web.log
    194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
    183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
    163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    101.226.68.137 - - [18/Sep/2013:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
    222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
    222.68.172.190 - - [18/Sep/2013:06:50:08 +0000] "-" 400 0 "-" "-"
    183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    66.249.66.84 - - [18/Sep/2013:06:50:28 +0000] "GET /page/6/ HTTP/1.1" 200 27777 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
    

    (2)期望输出数据 : 每行字段长度都大于11
    2.需求分析 : 需要在Map阶段对输入的数据根据规则进行过滤清洗。
    3.实现代码
    (1)编写LogMapper

    package com.xxx.mapreduce.weblog;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
      
      Text k = new Text();
      
      @Override
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
          
          // 1 获取1行数据
          String line = value.toString();
          
          // 2 解析日志
          boolean result = parseLog(line,context);
          
          // 3 日志不合法退出
          if (!result) {
              return;
          }
          
          // 4 设置key
          k.set(line);
          
          // 5 写出数据
          context.write(k, NullWritable.get());
      }
    
      // 2 解析日志
      private boolean parseLog(String line, Context context) {
    
          // 1 截取
          String[] fields = line.split(" ");
          
          // 2 日志长度大于11的为合法
          if (fields.length > 11) {
    
              // 系统计数器
              context.getCounter("map", "true").increment(1);
              return true;
          }else {
              context.getCounter("map", "false").increment(1);
              return false;
          }
      }
    }
    

    (2)编写LogDriver

    package com.xxx.mapreduce.weblog;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class LogDriver {
    
      public static void main(String[] args) throws Exception {
    
    // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
           args = new String[] { "e:/input/inputlog", "e:/output1" };
    
          // 1 获取job信息
          Configuration conf = new Configuration();
          Job job = Job.getInstance(conf);
    
          // 2 加载jar包
          job.setJarByClass(LogDriver.class);
    
          // 3 关联map
          job.setMapperClass(LogMapper.class);
    
          // 4 设置最终输出类型
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(NullWritable.class);
    
          // 设置reducetask个数为0
          job.setNumReduceTasks(0);
    
          // 5 设置输入和输出路径
          FileInputFormat.setInputPaths(job, new Path(args[0]));
          FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
          // 6 提交
          job.waitForCompletion(true);
      }
    }
    

    ② 数据清洗案例实操-复杂解析版
    1.需求 : 对Web访问日志中的各字段识别切分,去除日志中不合法的记录。根据清洗规则,输出过滤后的数据。
    (1)输入数据

    //web.log
    194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
    183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
    163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    101.226.68.137 - - [18/Sep/2013:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
    222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
    222.68.172.190 - - [18/Sep/2013:06:50:08 +0000] "-" 400 0 "-" "-"
    183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    66.249.66.84 - - [18/Sep/2013:06:50:28 +0000] "GET /page/6/ HTTP/1.1" 200 27777 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
    

    (2)期望输出数据 : 都是合法的数据
    2.实现代码
    (1)定义一个bean,用来记录日志数据中的各数据字段

    package com.xxx.mapreduce.log;
    
    public class LogBean {
      private String remote_addr;// 记录客户端的ip地址
      private String remote_user;// 记录客户端用户名称,忽略属性"-"
      private String time_local;// 记录访问时间与时区
      private String request;// 记录请求的url与http协议
      private String status;// 记录请求状态;成功是200
      private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
      private String http_referer;// 用来记录从那个页面链接访问过来的
      private String http_user_agent;// 记录客户浏览器的相关信息
    
      private boolean valid = true;// 判断数据是否合法
    
      public String getRemote_addr() {
          return remote_addr;
      }
    
      public void setRemote_addr(String remote_addr) {
          this.remote_addr = remote_addr;
      }
    
      public String getRemote_user() {
          return remote_user;
      }
    
      public void setRemote_user(String remote_user) {
          this.remote_user = remote_user;
      }
    
      public String getTime_local() {
          return time_local;
      }
    
      public void setTime_local(String time_local) {
          this.time_local = time_local;
      }
    
      public String getRequest() {
          return request;
      }
    
      public void setRequest(String request) {
          this.request = request;
      }
    
      public String getStatus() {
          return status;
      }
    
      public void setStatus(String status) {
          this.status = status;
      }
    
      public String getBody_bytes_sent() {
          return body_bytes_sent;
      }
    
      public void setBody_bytes_sent(String body_bytes_sent) {
          this.body_bytes_sent = body_bytes_sent;
      }
    
      public String getHttp_referer() {
          return http_referer;
      }
    
      public void setHttp_referer(String http_referer) {
          this.http_referer = http_referer;
      }
    
      public String getHttp_user_agent() {
          return http_user_agent;
      }
    
      public void setHttp_user_agent(String http_user_agent) {
          this.http_user_agent = http_user_agent;
      }
    
      public boolean isValid() {
          return valid;
      }
    
      public void setValid(boolean valid) {
          this.valid = valid;
      }
    
      @Override
      public String toString() {
    
          StringBuilder sb = new StringBuilder();
          sb.append(this.valid);
          sb.append("\001").append(this.remote_addr);
          sb.append("\001").append(this.remote_user);
          sb.append("\001").append(this.time_local);
          sb.append("\001").append(this.request);
          sb.append("\001").append(this.status);
          sb.append("\001").append(this.body_bytes_sent);
          sb.append("\001").append(this.http_referer);
          sb.append("\001").append(this.http_user_agent);
          
          return sb.toString();
      }
    }
    

    (2)编写LogMapper

    package com.xxx.mapreduce.log;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
      Text k = new Text();
      
      @Override
      protected void map(LongWritable key, Text value, Context context)   throws IOException, InterruptedException {
    
          // 1 获取1行
          String line = value.toString();
          
          // 2 解析日志是否合法
          LogBean bean = parseLog(line);
          
          if (!bean.isValid()) {
              return;
          }
          
          k.set(bean.toString());
          
          // 3 输出
          context.write(k, NullWritable.get());
      }
    
      // 解析日志
      private LogBean parseLog(String line) {
    
          LogBean logBean = new LogBean();
          
          // 1 截取
          String[] fields = line.split(" ");
          
          if (fields.length > 11) {
    
              // 2封装数据
              logBean.setRemote_addr(fields[0]);
              logBean.setRemote_user(fields[1]);
              logBean.setTime_local(fields[3].substring(1));
              logBean.setRequest(fields[6]);
              logBean.setStatus(fields[8]);
              logBean.setBody_bytes_sent(fields[9]);
              logBean.setHttp_referer(fields[10]);
              
              if (fields.length > 12) {
                  logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);
              }else {
                  logBean.setHttp_user_agent(fields[11]);
              }
              
              // 大于400,HTTP错误
              if (Integer.parseInt(logBean.getStatus()) >= 400) {
                  logBean.setValid(false);
              }
          }else {
              logBean.setValid(false);
          }
          
          return logBean;
      }
    }
    

    (3)编写LogDriver

    package com.xxx.mapreduce.log;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class LogDriver {
      public static void main(String[] args) throws Exception {
          
    // 1 获取job信息
          Configuration conf = new Configuration();
          Job job = Job.getInstance(conf);
    
          // 2 加载jar包
          job.setJarByClass(LogDriver.class);
    
          // 3 关联map
          job.setMapperClass(LogMapper.class);
    
          // 4 设置最终输出类型
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(NullWritable.class);
    
          // 5 设置输入和输出路径
          FileInputFormat.setInputPaths(job, new Path(args[0]));
          FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
          // 6 提交
          job.waitForCompletion(true);
      }
    }
    

    MapReduce开发总结


    Hadoop数据压缩

    1️⃣ 概述

    2️⃣MR支持的压缩编码 3️⃣压缩方式选择
    Gzip压缩 Bzip2压缩 Lzo压缩 Snappy压缩 4️⃣压缩位置选择 : 压缩可以在MapReduce作用的任意阶段启用,如下图所示。 5️⃣压缩参数配置 6️⃣ 压缩实操案例
    1. 数据流的压缩和解压缩
    package com.xxx.mapreduce.compress;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.io.compress.CompressionInputStream;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.util.ReflectionUtils;
    
    public class TestCompress {
    
      public static void main(String[] args) throws Exception {
          compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec");
    //     decompress("e:/hello.txt.bz2");
      }
    
      // 1、压缩
      private static void compress(String filename, String method) throws Exception {
          
          // (1)获取输入流
          FileInputStream fis = new FileInputStream(new File(filename));
          
          Class codecClass = Class.forName(method);
          
          CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
          
          // (2)获取输出流
          FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
          CompressionOutputStream cos = codec.createOutputStream(fos);
          
          // (3)流的对拷
          IOUtils.copyBytes(fis, cos, 1024*1024*5, false);
          
    // (4)关闭资源
          cos.close();
          fos.close();
    fis.close();
      }
    
      // 2、解压缩
      private static void decompress(String filename) throws FileNotFoundException, IOException {
          
          // (0)校验是否能解压缩
          CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
    
          CompressionCodec codec = factory.getCodec(new Path(filename));
          
          if (codec == null) {
              System.out.println("cannot find codec for file " + filename);
              return;
          }
          
          // (1)获取输入流
          CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
          
          // (2)获取输出流
          FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));
          
          // (3)流的对拷
          IOUtils.copyBytes(cis, fos, 1024*1024*5, false);
          
          // (4)关闭资源
          cis.close();
          fos.close();
      }
    }
    
    1. Map输出端采用压缩 : 即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对Map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可;
    package com.xxx.mapreduce.compress;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.BZip2Codec;   
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCountDriver {
    
      public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
          Configuration configuration = new Configuration();
    
          // 开启map端输出压缩
      configuration.setBoolean("mapreduce.map.output.compress", true);
          // 设置map端输出压缩方式
      configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
    
          Job job = Job.getInstance(configuration);
    
          job.setJarByClass(WordCountDriver.class);
    
          job.setMapperClass(WordCountMapper.class);
          job.setReducerClass(WordCountReducer.class);
    
          job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(IntWritable.class);
    
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(IntWritable.class);
    
          FileInputFormat.setInputPaths(job, new Path(args[0]));
          FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
          boolean result = job.waitForCompletion(true);
    
          System.exit(result ? 1 : 0);
      }
    }
    

    1.Mapper保持不变

    package com.xxx.mapreduce.compress;
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    
    Text k = new Text();
      IntWritable v = new IntWritable(1);
    
      @Override
      protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
    
          // 1 获取一行
          String line = value.toString();
    
          // 2 切割
          String[] words = line.split(" ");
    
          // 3 循环写出
          for(String word:words){
    k.set(word);
              context.write(k, v);
          }
      }
    }
    
    1. Reducer保持不变
    package com.xxx.mapreduce.compress;
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
      IntWritable v = new IntWritable();
    
      @Override
      protected void reduce(Text key, Iterable<IntWritable> values,
              Context context) throws IOException, InterruptedException {
          
          int sum = 0;
    
          // 1 汇总
          for(IntWritable value:values){
              sum += value.get();
          }
          
           v.set(sum);
    
           // 2 输出
          context.write(key, v);
      }
    }
    
    1. Reduce输出端采用压缩
      1.修改驱动
    package com.xxx.mapreduce.compress;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.BZip2Codec;
    import org.apache.hadoop.io.compress.DefaultCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.io.compress.Lz4Codec;
    import org.apache.hadoop.io.compress.SnappyCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCountDriver {
    
      public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
          
          Configuration configuration = new Configuration();
          
          Job job = Job.getInstance(configuration);
          
          job.setJarByClass(WordCountDriver.class);
          
          job.setMapperClass(WordCountMapper.class);
          job.setReducerClass(WordCountReducer.class);
          
          job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(IntWritable.class);
          
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(IntWritable.class);
          
          FileInputFormat.setInputPaths(job, new Path(args[0]));
          FileOutputFormat.setOutputPath(job, new Path(args[1]));
          
          // 设置reduce端输出压缩开启
          FileOutputFormat.setCompressOutput(job, true);
          
          // 设置压缩的方式
          FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); 
    //     FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); 
    //     FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); 
          
          boolean result = job.waitForCompletion(true);
          
          System.exit(result?1:0);
      }
    }
    

    2.MapperReducer保持不变(详见6.2

    相关文章

      网友评论

          本文标题:8_大数据之MapReduce_3

          本文链接:https://www.haomeiwen.com/subject/kedaqctx.html