美文网首页
Hive-Spark-Flink 大小表join

Hive-Spark-Flink 大小表join

作者: Eqo | 来源:发表于2022-08-21 08:59 被阅读0次

在大数据离线批处理中,需求【大表(事实表)与小表(维度表)】关联字段,进行分析

Hive 默认开启Map端Join

  • 将小表数据加载到 Hash table file中 分布式缓存,每个task 都有一份

  • 将小表数据放到文件中


  • 过程:

  1. 先加载小表数据 存储到Hash table文件中
    2.将文件中的数据 存放到分布式缓存中
    3.大表中的每个task从分布式缓存中拉取数据
    MapJoin只有maptask 没有reduceTask 没有shuffle 提高了性能

Spark Broadcast Join

  • spark采用 广播join 将小表数据放到hash集合中, 广播到executor内存中,被该executor中的task共享,该小表只能读不能写

  • 将小表数据放到 变量中


  • 开启


    image.png

spark.sql.autoBroadcastJoinThreshold 值为-1

Flink

1_广播变量Broadcast 小表数据广播广播变量
2_分布式缓存 小表数据分布式缓存

广播变量Broadcast

  • Flink在进行批处理 的时候, 将数据封装到DataSet中,然后将小表数据广播到TaskManager中 ,被该TaskManager下的subtask 共享
    数据存储在内存中
    分布式缓存+广播

  • 特点

    • 广播的数据不能太大
    • 广播的小表数据 只能读不能写
  • 步骤
    1:广播数据
    .withBroadcastSet(DataSet, "name");
    在大表数据转换算子 后 加 .withBroadcastset

    2:获取广播的数据
    Collection<> broadcastSet = getRuntimeContext().getBroadcastVariable("name");
    使用富函数,获取上下文对象中的....表名

    3:使用广播数据

  • 实例代码
    注意是批处理执行环境 可以把executor 省略

package cn.itcast.flink.batch;
//
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

/**测试  Flink  小表数据广播到TaskManager内存中,当TM的Slot槽运行subTask子任务时,获取广播的变量值进行处理
 * @author ado
 */
public class BatchBroadcastDemo {

    public static void main(String[] args) throws Exception {
        // 1. 执行环境-env
        // 将执行环境设置成批处理 
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2. 数据源-source
        //大表数据
        DataSource<Tuple3<Integer, String, Integer>> scoreDataSet = env.fromCollection(
                Arrays.asList(
                        Tuple3.of(1, "语文", 50),
                        Tuple3.of(1, "数学", 70),
                        Tuple3.of(1, "英语", 86),
                        Tuple3.of(2, "语文", 80),
                        Tuple3.of(2, "数学", 86),
                        Tuple3.of(2, "英语", 96),
                        Tuple3.of(3, "语文", 90),
                        Tuple3.of(3, "数学", 68),
                        Tuple3.of(3, "英语", 92)
                )
        );

        // 小表数据
        DataSource<Tuple2<Integer, String>> studentDataSeT = env.fromCollection(Arrays.asList(

                Tuple2.of(1, "张三"), Tuple2.of(2, "李四"), Tuple2.of(3, "王五")
        )); 

        // 3. 数据转换-transformation
        MapOperator<Tuple3<Integer, String, Integer>, Object> result = scoreDataSet.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Object>() {

            HashMap<Integer, String> supMap = new HashMap<>();


            // todo 2 在open方法中获取 上下文对象
            @Override
            public void open(Configuration parameters) throws Exception {


                //todo 3使用上下文对象获取 广播小表中的数据
                List<Tuple2<Integer, String>> list = getRuntimeContext().getBroadcastVariable("students");
                //todo 4将小表dataset0中的数据存放到集合当中 准备join key->要关联的字段 value->值
                for (Tuple2<Integer, String> value : list) {
                    supMap.put(value.f0, value.f1);

                }
            }

            @Override
            // 实现map算子 完成两个表的join
            public String map(Tuple3<Integer, String, Integer> value) throws Exception {
                        /*
                            value -> (1, "数学", 70)  ,  依据学生标号:1 到map集合中获取学生姓名
                         */
                //获取要join的字段 然后从集合中获取对应的值
                Integer stuId = value.f0;
                //使用 获取默认值
                String stuName = supMap.getOrDefault(stuId, "未知");
                return stuName + "," + value.f1 + "," + value.f2;
            }

            //todo step1 在大表数据的处理算子后面加with...(小表的名称, 小表的广播名称)
        }).withBroadcastSet(studentDataSeT, "students");
        result.print();


        // 4. 数据终端-sink

        // 5. 触发执行-execute
//        env.execute("dsadajb");

}  }

相关文章

网友评论

      本文标题:Hive-Spark-Flink 大小表join

      本文链接:https://www.haomeiwen.com/subject/pxdpgrtx.html