背景
一张ip表,一张ip地理信息表,地理信息表每条数据包含了ip地址的起点和终点以及一些地理信息, 需要用 ip 去关联 gep_ip 中匹配相应的信息 。
例如:
数据条数为 50 M 的表 ip_record,数据格式大致如下:
ip_int | info |
---|---|
123456789 | xx |
987654321 | xx |
数据条数为 7 M 的表 geoip ,数据格式大致如下:
ipstart | ipend | country | province | city | ... |
---|---|---|---|---|---|
0 | 10000 | ... | ... | ... | ... |
10001 | 25000 | ... | ... | ... | ... |
native join
ip_record 和 geoip 关联,找出ip对应的geo信息,写出的 sql 应该是这样的:
SELECT A.*,
B.*
FROM ip_record A
JOIN geoip B
ON A.ip_int >= B.ipstart
AND A.ip_int <= B.ipend
会触发一个 cartesian product ,然后通过 filter 筛出你需要的数据。
broadcast join ?
SELECT
/*+ broadcast(B) */
A.*,
B.*
FROM
ip_record A
JOIN geoip B
ON A.ip_int >= B.ipstart
AND A.ip_int <= B.ipend
会触发 BroadcastNestedLoopJoin
,每条 record 都会产生大量的循环。
上述两种方法都会有 50M * 7M 次的循环。
解决方法
- 将 geoip 表的 ipstart 转化为一个列表,进行广播。
- 遍历 record 表,在广播列表中使用二分查找到相应 ipstart。
pySpark (2.X) 代码实现:
from bisect import bisect_right
from pyspark.sql.types import LongType
#选取 ipstart 字段,排序广播
geo_start_bd = sc.broadcast(geo_ip
.select("ipstart")
.orderBy("ipstart")
.rdd
.flatMap(lambda x: x)
.collect())
#二分查找,找到对应start
def find_le(x):
i = bisect_right(geo_start_bd.value, x)
if i:
return geo_start_bd.value[i-1]
return None
spark.udf.register("find_le",find_le)
spark.sql("""
select
a.ip_int,b.country,b.province,b.city,b.isp
from
(select *,find_le(ip_int) as ipstart from ip_record) a
left join geo_ip b
on a.ipstart = b.ipstart
""")
执行计划 变成了 sortMergeJoin 。
结论
时间复杂度:O(N * M) -> O(N * LOG(M))
。N 为 record 数量,M 为 geo_ip 表数量。
测试环境:
- spark 2.2
- executor(3c 12g) * 15
- 所有record的数据分区数为 45
在这个场景中计算耗时:185 hour (预估,如果能计算出来) -> 2 min,性能提升了10000X
geo_ip 不变:计算时间随 record 数量变化表:
record | cartesianProduct | broadcastNestLoopJoin (广播 geo_ip) | after optimized |
---|---|---|---|
10^4 | 6.2 min | 3.5 min | 27s |
10^5 | 66 min | 30 min | 33s |
10^6 | - | - | 27s |
10^7 | - | - | 27s |
10^8 | - | - | 51s |
网友评论