美文网首页
11.Join的MapReduce实现

11.Join的MapReduce实现

作者: 哈哈大圣 | 来源:发表于2019-12-08 00:12 被阅读0次

    Join在MapReduce中的实现

    一、概述

    tips: Hive: MapReduce/Spark巧用 explain 查看语法树

    1. 常见的面试题:描述如何使用MapReduce来实现join功能:考察点

      1. MapReduce执行流程
      2. join的底层执行过程
      3. join的多种实现方式
        • ReduceJoin(有shuffle)
        • MapJoin(没有ReduceJoin,也就是没有Shuffle的过程)
    2. 其他常见面试题

      • Mapper的泛型里面有几个参数,各是啥意思(4 个,文件输入的key/value;写入上下文的key/value)
      • map方法有几个参数,各是啥意思(3 个,文件输入的key/value;上下文)
      • 字符串拼接为啥不建议使用 + 而是StringBuilder
      • Mapper/Reduce的生命周期
    • tips:简历:项目
      • 最新的项目写在最前面
      • 写的东西一定要真正会的(写的一定要会,会的不一定是自己写的,哈哈)
      • 从自己写的东西开始面起,然后逐步扩展=>基金/技术的一个功能链条
      • 想要高薪,得挖相关技术的祖坟!

    二、原始数据

    1. emp.txt
    7369    SMITH   CLERK   7902    1980-12-17  800.00      20
    7499    ALLEN   SALESMAN    7698    1981-2-20   1600.00 300.00  30
    7521    WARD    SALESMAN    7698    1981-2-22   1250.00 500.00  40
    7566    JONES   MANAGER 7839    1981-4-2    2975.00     20
    7654    MARTIN  SALESMAN    7698    1981-9-28   1250.00 1400.00 30
    7698    BLAKE   MANAGER 7839    1981-5-1    2850.00     30
    7782    CLARK   MANAGER 7839    1981-6-9    2450.00     10
    7788    SCOTT   ANALYST 7566    1987-4-19   3000.00     20
    7839    KING    PRESIDENT       1981-11-17  5000.00     10
    7844    TURNER  SALESMAN    7698    1981-9-8    1500.00 0.00    30
    7876    ADAMS   CLERK   7788    1987-5-23   1100.00     20
    7900    JAMES   CLERK   7698    1981-12-3   950.00      30
    7902    FORD    ANALYST 7566    1981-12-3   3000.00     20
    7934    MILLER  CLERK   7782    1982-1-23   1300.00     40
    8888    HIVE    PROGRAM 7839    1988-1-23   10300.00        
    
    1. dept.txt
    10  ACCOUNTING  NEW YORK
    20  RESEARCH    DALLAS
    30  SALES   CHICAGO
    40  OPERATIONS  BOSTON
    

    三、ReduceJoin流程分析以及实现

    1. 数据通过Mapper加载过来,获取当前输入的文件名,确认封装的对象并在对象中标记,以join条件为key写入上下文,然后经过shuffle阶段,然后在Reduce端根据flag分发为不同的对象然后进行排列组合写入文件。

    2. 代码实现

    import lombok.*;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.lang.reflect.Method;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 前提:数据被ETL处理过,能被Hive直接使用
     * emp.txt
     *      empno   ename   sal
     * dept.txt
     *      deptno  dname
     *
     * emp.txt 与 dept.txt是多对一的关系
     * @author Liucheng
     * @since 2019-12-06
     */
    public class ReduceJoinApp {
    
        public static void main(String[] args) throws Exception {
    
            // 配置类;默认为本地文件系统
            Configuration configuration = new Configuration();
    
            // Job工作类
            Job job = Job.getInstance(configuration);
    
            // 设置住主类
            job.setJarByClass(ReduceJoinApp.class);
    
            // 配置Reducer Task任务个数
            // job.setNumReduceTasks(3);
    
            // 配置Mapper与Reducer
            job.setMapperClass(ReduceJoinMapper.class);
            job.setReducerClass(ReduceJoinReducer.class);
    
            // 告知Mapper的输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Entity.class);
    
            // 告知Reducer的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 本地需要处理的文件
            Path emp = new Path("E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\join\\emp.txt");
            Path dept = new Path("E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\join\\dept.txt");
    
            Path outputPath = new Path("E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\join\\reduce-join");
    
            FileSystem fileSystem = FileSystem.get(configuration);
            fileSystem.delete(outputPath, true);
    
            MultipleInputs.addInputPath(job, emp, TextInputFormat.class);
            MultipleInputs.addInputPath(job, dept, TextInputFormat.class);
    
            FileOutputFormat.setOutputPath(job, outputPath);
    
            job.waitForCompletion(true);
    
        }
    }
    
    class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Entity> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            // 获取文件名
            String filename = getFileName(context);
    
            String[] datas = value.toString().split("\t");
    
            if (filename.contains("emp.txt")) {
                if (datas.length < 8) {
                    return;
                }
    
                // 员工表处理逻辑
                String deptno = datas[7];
                // 第一个属性flag表示标识来源
                Entity emp = new Entity("emp", datas[0], datas[1], datas[5], deptno);
                context.write(new Text(deptno), emp);
            } else {
                // 部门表处理逻辑
                if (datas.length < 3) {
                    return;
                }
                String deptno = datas[0];
                Entity dept = new Entity("dept", datas[1]);
                context.write(new Text(deptno), dept);
            }
        }
    
        /**
         * 获取文件名的方法
         */
    
        public String getFileName(Context context) {
    
            // 获取记录对应的文件信息
            InputSplit inputSplit = context.getInputSplit();
            Class<? extends InputSplit> splitClass = inputSplit.getClass();
            FileSplit fileSplit = null;
            if (splitClass.equals(FileSplit.class)) {
                fileSplit = (FileSplit) inputSplit;
            } else if (splitClass.getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
                try {
                    Method getInputSplitMethod = splitClass.getDeclaredMethod("getInputSplit");
                    getInputSplitMethod.setAccessible(true);
                    fileSplit = (FileSplit) getInputSplitMethod.invoke(inputSplit);
                } catch (Exception e) {
                    System.out.println(e);
                    throw new RuntimeException(e);
                }
            }
    
            // 获取文件名
            String fileName = fileSplit.getPath().getName();
            // 获取文件所在的路径名
            String filePath = fileSplit.getPath().getParent().toUri().getPath();
    
            return fileName;
        }
    }
    
    
    class ReduceJoinReducer extends Reducer<Text, Entity, Text, NullWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<Entity> values, Context context) throws IOException, InterruptedException {
    
            List<Entity> emps = new ArrayList<>();
            Entity[] dept = new Entity[1];
    
            values.forEach(entity -> {
                if ("emp".equals(entity.getFlag())) {
                    emps.add(new Entity(entity));
                    System.out.println("刚取出来时" + entity);
                } else if ("dept".equals(entity.getFlag())) {
                    dept[0] = new Entity(entity);
                    System.out.println("刚取出来时" + entity);
                }
            });
    
            emps.forEach(entity -> {
                StringBuilder sb = new StringBuilder();
                sb.append(entity.getEmpnno()).append("\t")
                        .append(entity.getEname()).append("\t")
                        .append(entity.getSal()).append("\t")
                        .append(entity.getDeptno()).append("\t")
                        .append(dept[0].getDname());
    
                try {
                    context.write(new Text(sb.toString()), NullWritable.get());
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    
    @Getter
    @Setter
    @AllArgsConstructor
    @NoArgsConstructor
    class Entity implements Writable {
    
        private String flag;
        private String empnno;
        private String ename;
        private String sal;
        private String deptno;
        private String dname;
    
        public Entity(String flag, String empnno, String ename, String sal, String deptno) {
            this.flag = flag;
            this.empnno = empnno;
            this.ename = ename;
            this.sal = sal;
            this.deptno = deptno;
        }
    
        public Entity(String flag, String dname) {
            this.flag = flag;
            this.dname = dname;
        }
    
        public Entity(Entity newEntity) {
            this.flag = newEntity.getFlag();
            this.empnno = newEntity.getEmpnno();
            this.ename = newEntity.getEname();
            this.sal = newEntity.getSal();
            this.deptno = newEntity.getDeptno();
            this.dname = newEntity.getDname();
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(flag);
            out.writeUTF(empnno != null ? empnno : "");
            out.writeUTF(ename != null ? ename : "");
            out.writeUTF(sal != null ? sal : "");
            out.writeUTF(deptno != null ? deptno : "");
            out.writeUTF(dname != null ? dname : "");
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.flag = in.readUTF();
            this.empnno = in.readUTF();
            this.ename = in.readUTF();
            this.sal = in.readUTF();
            this.deptno = in.readUTF();
            this.dname = in.readUTF();
        }
    
        @Override
        public String toString() {
            return this.hashCode() + "--Entity{" +
                    "flag='" + flag + '\'' +
                    ", empnno='" + empnno + '\'' +
                    ", ename='" + ename + '\'' +
                    ", sal='" + sal + '\'' +
                    ", deptno='" + deptno + '\'' +
                    ", dname='" + dname + '\'' +
                    '}';
        }
    }
    

    四、MapJoin流程分析

    1. 适合数据小,是否有必要全部
    2. shuffle是整个大数据处理过程中非常耗时,非常损耗性能的地方
    3. 能规避shuffle的地方就不要使用shuffle【调优,有些情况下进行shuffle是更加优化,这种情况比较少】
    4. 将小文件的内容写入缓存,读取比较大文件,然后在缓存中根据join条件查找写入缓存即可。
    5. 代码实现
    package com.hahadasheng.bigdata.hadooplearning.reducejoin;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.*;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author Liucheng
     * @since 2019-12-07
     */
    public class MapperJoinApp {
    
        public static void main(String[] args) throws Exception {
    
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
            job.setJarByClass(MapperJoinApp.class);
    
            // 配置Reducer Task任务个数为0
            job.setNumReduceTasks(0);
    
            job.setMapperClass(MapperJoinMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            // 小文件 hdfs文件
            URI dept = new URI("/mapjoin/dept.txt");
            // 将小文件加到分布式缓存中
            job.addCacheFile(dept);
    
            // 大文件 hdfs文件
            Path emp = new Path("/mapjoin/emp.txt");
            // 写入大文件
            FileInputFormat.setInputPaths(job, emp);
    
            Path outputPath = new Path("/mapjoin/map-join");
            FileSystem fileSystem = FileSystem.get(configuration);
            fileSystem.delete(outputPath, true);
            // 文件输出
            FileOutputFormat.setOutputPath(job, outputPath);
    
            job.waitForCompletion(true);
    
        }
    }
    
    class MapperJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    
        private Map<String, String> deptCatch = new HashMap<>();
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
    
            String filePath = context.getCacheFiles()[0].toString();
            //String filePath = "E:/ImprovementWorkingSpace/hadoop-learning/src/main/resources/join/dept.txt";
            
            // 如下这个方法是在本地读取文件,在HDFS上会报错!
            BufferedReader br = new BufferedReader(new FileReader(filePath));
    
            String line;
            while ((line = br.readLine()) != null) {
                String[] datas = line.split("\t");
                // 部门表处理逻辑
                if (datas.length < 3) {
                    return;
                }
                String deptno = datas[0];
                String dname = datas[1];
                deptCatch.put(deptno, dname);
            }
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] datas = value.toString().split("\t");
    
            if (datas.length < 8) {
                return;
            }
            // 员工表处理逻辑
            String empnno = datas[0];
            String ename = datas[1];
            String sal = datas[5];
            String deptno = datas[7];
    
            StringBuilder sb = new StringBuilder();
            sb.append(empnno).append("\t")
                    .append(empnno).append("\t")
                    .append(ename).append("\t")
                    .append(sal).append("\t")
                    .append(deptno).append("\t")
                    .append(deptCatch.get(deptno));
    
            context.write(new Text(sb.toString()), NullWritable.get());
        }
    }
    

    win10环境下有问题,建议在Linux环境下测试。

    • 修改文件core-site.xml添加如下内容
    <property>
        <name>dfs.permissions</name>
        <value>false</value>
    </property>
    
    • 问题解决
    1. 每个节点的core-site.xml要添加配置
    
    <property>
        <name>dfs.permissions</name>
        <value>false</value>
    </property>
    
    2. main方法中的
    
    job.addCacheFile(new URI(xxx))
    
    这里会在hdfs上找对应的文件,如果没有则会报错
    
    3. Mapper中的setup
    
    String filePath = context.getCacheFiles()[0].toString();
    BufferedReader br = new BufferedReader(new FileReader(filePath));
    
    这个是在本地文件系统找文件,因为调用的是Java标准库的API,没有使用Hadoop相关的流,我尝试过使用完成的路径,加上hdfs://hadoop:8020也会报错;
    
    3. 最终,我在hdfs上创建 了目录,并把dept.txt文件上传到该目录,然后在Linux本机同样创建相同的目录,将dept.txt文件也拷贝一份,终于运行成功了
    

    五、小文件

    Hadoop存储TB甚至更大级别的数据集
    file ==> block ==> 3
           ==> DN directory
    元数据信息存放在 NN NameNode 内存中
    100M vs 1k 的信息都会存放在NN中,
    如果小文件越多,NN的压力越大,就算是有备份的NN,也无济于事
    
    什么是小文件:按照自己忍受的程度决定
        CDH blocksize 128M 默认,可以自定制
        blocks的大小设置决定了 元数据信息 大小
        NN的内存多少就决定存储多少
        monitor 监控 小文件的问题
        
    小文件是怎么产生的?
        故障:解决 ==> 为什么会产生这个故障? ==> 解决或者规避故障
        1) 某种手段把数据采集过来
            Flume 如果使用不当,采集到HDFS的数据会有很多的小文件 raw 源数据 (仅仅按照官网配置,没有优化手段)
            Logstash
            从 WebServer 采集到HDFS
        2) MR<进程>/Hive/Spark(Core/SQL/Streaming)<线程>
            ETL预处理 产生很多小文件
            Stat统计分析 数据仓库 分好几层 又是一对小文件
    
    小文件解决方案
        删? 
            原始数据处理完可以,
            ETL: 时间 比如1年 2年之后 ==> 迁移(discp 工具)
            Stat统计: 可以? 不可以 ? 
        合?
            标题党推荐的SequenceFile优点、缺点?,自己要有辨识度
            CombineFileInputFormat 文本、列式(ORC/Parquet)
            Hive合并小文件的参数控制: 性能不咋样 假如没有使用Hive咋办
            Hadoop追加:可以? 不可以?
                离线处理 batch 假如为 ETL ==> 目录
                假如数据是错的 ==> 重跑 append 数据不对了
            HBase: 假如生产上没有使用HBase咋办
            如果生成上使用了Spark/Flink:一个task处理多少数据
            reduce个数决定了 ==> 文件输出的个数 根据环境
                多  ==> 快 但是files多
                少  ==> skew歪斜
            SQL:
            
    
    学大数据要学会造数据。
    学会看英文文档。看一手的东西。愿意去读,长期积累。技术更新迭代很快,只有学会看一手文档才能保持自身的知识迭代。
    大数据调优,要根据线上的环境找合适的方式解决,没有一劳永逸的方案。
    
    
    官网 DistCp 工具 是一个MapReduce作业,但是没有Reduce工程
    
    面试题(查看官网!)
        hadoop2有哪些新特性,那个版本添加进来的
        hadoop3有哪些新特性,那个版本添加进来的
        MapReduce仅有map,不需要reduce的场景
            MapJoin ETL Sqoop
        WebHDFS
        Federation
        Snapshots
        Quotas and HDFS
        Short Circuit Local Reads
        Centralized Cache Management
    

    版本信息介绍

    Cloudera: CDH 手工搭建 CM WebUI搭建
    Hortonworks: HDP
    MapR:
    

    相关文章

      网友评论

          本文标题:11.Join的MapReduce实现

          本文链接:https://www.haomeiwen.com/subject/ahbpgctx.html