在大数据离线批处理中,需求【大表(事实表)与小表(维度表)】关联字段,进行分析
Hive 默认开启Map端Join
-
将小表数据加载到 Hash table file中 分布式缓存,每个task 都有一份
-
将小表数据放到文件中
-
过程:
- 先加载小表数据 存储到Hash table文件中
2.将文件中的数据 存放到分布式缓存中
3.大表中的每个task从分布式缓存中拉取数据
MapJoin只有maptask 没有reduceTask 没有shuffle 提高了性能
Spark Broadcast Join
-
spark采用 广播join 将小表数据放到hash集合中, 广播到executor内存中,被该executor中的task共享,该小表只能读不能写
-
将小表数据放到 变量中
-
开启
image.png
spark.sql.autoBroadcastJoinThreshold 值为-1
- 那么该小表是多大呢, 默认是10M 实际开发中要修改
Performance Tuning - Spark 3.3.0 Documentation (apache.org) - 那么怎样查看一个表的大小
ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan
分析表 表名 计算 统计信息 不开启全局扫描
Flink
1_广播变量Broadcast 小表数据广播广播变量
2_分布式缓存 小表数据分布式缓存
广播变量Broadcast
-
Flink在进行批处理 的时候, 将数据封装到DataSet中,然后将小表数据广播到TaskManager中 ,被该TaskManager下的subtask 共享
数据存储在内存中
分布式缓存+广播 -
特点
- 广播的数据不能太大
- 广播的小表数据 只能读不能写
-
步骤
1:广播数据
.withBroadcastSet(DataSet, "name");
在大表数据转换算子 后 加 .withBroadcastset2:获取广播的数据
Collection<> broadcastSet = getRuntimeContext().getBroadcastVariable("name");
使用富函数,获取上下文对象中的....表名3:使用广播数据
-
实例代码
注意是批处理执行环境 可以把executor 省略
package cn.itcast.flink.batch;
//
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
/**测试 Flink 小表数据广播到TaskManager内存中,当TM的Slot槽运行subTask子任务时,获取广播的变量值进行处理
* @author ado
*/
public class BatchBroadcastDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
// 将执行环境设置成批处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 数据源-source
//大表数据
DataSource<Tuple3<Integer, String, Integer>> scoreDataSet = env.fromCollection(
Arrays.asList(
Tuple3.of(1, "语文", 50),
Tuple3.of(1, "数学", 70),
Tuple3.of(1, "英语", 86),
Tuple3.of(2, "语文", 80),
Tuple3.of(2, "数学", 86),
Tuple3.of(2, "英语", 96),
Tuple3.of(3, "语文", 90),
Tuple3.of(3, "数学", 68),
Tuple3.of(3, "英语", 92)
)
);
// 小表数据
DataSource<Tuple2<Integer, String>> studentDataSeT = env.fromCollection(Arrays.asList(
Tuple2.of(1, "张三"), Tuple2.of(2, "李四"), Tuple2.of(3, "王五")
));
// 3. 数据转换-transformation
MapOperator<Tuple3<Integer, String, Integer>, Object> result = scoreDataSet.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Object>() {
HashMap<Integer, String> supMap = new HashMap<>();
// todo 2 在open方法中获取 上下文对象
@Override
public void open(Configuration parameters) throws Exception {
//todo 3使用上下文对象获取 广播小表中的数据
List<Tuple2<Integer, String>> list = getRuntimeContext().getBroadcastVariable("students");
//todo 4将小表dataset0中的数据存放到集合当中 准备join key->要关联的字段 value->值
for (Tuple2<Integer, String> value : list) {
supMap.put(value.f0, value.f1);
}
}
@Override
// 实现map算子 完成两个表的join
public String map(Tuple3<Integer, String, Integer> value) throws Exception {
/*
value -> (1, "数学", 70) , 依据学生标号:1 到map集合中获取学生姓名
*/
//获取要join的字段 然后从集合中获取对应的值
Integer stuId = value.f0;
//使用 获取默认值
String stuName = supMap.getOrDefault(stuId, "未知");
return stuName + "," + value.f1 + "," + value.f2;
}
//todo step1 在大表数据的处理算子后面加with...(小表的名称, 小表的广播名称)
}).withBroadcastSet(studentDataSeT, "students");
result.print();
// 4. 数据终端-sink
// 5. 触发执行-execute
// env.execute("dsadajb");
} }
网友评论