1.业务数据Sqoop导入数据时倾斜
首先了解Sqoop两个参数-m和--split-by参数的使用:
1. 这俩参数一般是放在一起使用
2.-m:表明需要使用几个map任务并发执行
m,–num-mappers 启动N个map来并行导入数据,默认是4个,最好不要将数字设置为高于集群的节点数
3.--split-by :–split-by 表的列名,用来切分工作单元,一般后面跟主键ID
拆分数据的字段. -m设置为4,数据有100条,sqoop首先会获取拆分字段的最大值,最小值,步长为100/4=25;
那么第一个map执行拆分字段值为(1,25)之间的数据
第二个map执行拆分字段值为(26,50)之间的数据
第三个map执行拆分字段值为(51,75)之间的数据
第四个map执行拆分字段值为(76,100)之间的数据
方案1.如果数据的主键字段本来就不均衡则sqoop运行时机会产生数据倾斜
解决方案:
查询语句中使用row_number产生一个新的排序字段rk,再指定–split-by rk,如此可以均匀切分数据
select *, rank() over( ) as 'rk' FROM user_info
sqoop import \
--connect jdbc:mysql://hadoop102:3306/test \ 连接
--username root \ 账号
--password 123456 \ 密码
--target-dir /sqoop/optputs \ 目标文件夹
--delete-target-dir \ 删除原有的文件夹
--query "select * , rank() over( ) as rk from user_info where \$CONDITIONS"\ 查询的sql语句
--num-mappers 1 \ 设置map数
--split-by rk \按照rk切分
--fields-terminated-by '\t' \ 存储数据的字段间隔
方案2.有比较均匀的索引
若表中有比较均匀的索引 index2 按照index2 split-by仍然跑的很慢
解决方案 :
可能mysql优化器选择的不是最佳索引因此可以采用 force index(index2) 强制使用index2为执行查询的索引
sqoop import \
--connect jdbc:mysql://hadoop102:3306/test \ 连接
--username root \ 账号
--password 123456 \ 密码
--target-dir /sqoop/optputs \ 目标文件夹
--delete-target-dir \ 删除原有的文件夹
--query "select * , rank() over( ) as rk from user_info force index(index2) where \$CONDITIONS"\查询的sql语句
--num-mappers 1 \ 设置map数
--split-by rk \按照rk切分
--fields-terminated-by '\t' \ 存储数据的字段间隔
方案3.采用FlickCDC
使用FlinkCDC配合flink CheckPoink 读取mysql数据转为Flink的DataStream,在使用Flink数据倾斜处理办法,最后写入HDFS。
2.埋点数据数据传输倾斜
埋点数据量非常大,可以采用分治思想,采用Flume配置kafkaChannel把不同的event发送给不同的kafkaTopic
event传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由Header和Body两部分组成,Header(最常用时间戳)用来存放该event的一些属性,为K-V结构。Body用来存放该条数据,形式为字节数组。可以跟去header中key值的不同发往不同的kafkaTopic
Kafka分区规则
若key为空,则随机发往各个分区
Key不为空根据类似hash(key)对分区数取模发往不同分区。
网友评论