漫谈join算法

作者: LittleMagic | 来源:发表于2019-08-01 22:14 被阅读55次

    Prologue

    join是我们这些整天与数据打交道的人绕不开的一个词,不管是在传统的关系型数据库,还是在大数据领域的数据仓库/数据湖中,join都是常客。特别是对于OLAP业务而言,几乎每个查询都需要用join来建立表之间的关系,地位非常之重要。本文就来简单讲解一些主要的join算法(真的非常简单哦)。

    Nested-loop join & Block nested-loop join

    Nested-loop (NL) join是所有join算法中最naive的一种。假设有两张表R和S,NL join会用二重循环的方法扫描每个(r, s)对,如果行r和行s满足join的条件,就输出之。显然,其I/O复杂度为O(|R||S|)。

    随着参与join的表个数增加,循环嵌套的层数就越多,时间复杂度也越高。因此虽然它的实现很简单,但效率也较低,所以在它的基础上衍生出了改进方案:Block nested-loop (BNL) join。

    BNL join的思路是:对于外层循环的表R,不再逐行扫描,而是一次加载一批(即所谓block)数据进入内存,并且将它们按join key散列在哈希表里,然后也按批扫描表S,并与哈希表进行比对,能够对的上的行就作为join的输出。一个block数据的大小一般是一个或多个内存页的容量。这样就可以将I/O复杂度从O(|R||S|)降低到O[p(R) * p(S) / M],其中p(R)、p(S)分别代表R和S换算成页数的大小,M代表可用内存中的总页数。

    下图示出BNL join的过程,以及NL/BNL join的伪码描述。

    在MySQL 5.5版本之前,其join算法只有原生的NL join,5.5版本之后则引入了BNL join进行优化,具体可以参考其官方文档中对应的Nested-Loop Join Algorithms章节。MySQL本身不是为了OLAP业务设计的,因此BNL join已经能够满足非大量数据的join需求。

    Hash join & Grace hash join

    以数仓维度建模的思想考虑,OLAP业务大多是维度表和事实表的join。由于维度表一般较小,如果它可以整体放进内存,那么就可以通过两步完成join。设R为维度表(小表),S为事实表(大表):

    1. 将R的所有数据按key散列,构成哈希表,value就是原来的行数据;
    2. 扫描S,计算key的哈希值并观察是否在哈希表内,输出结果。

    其过程如下图所示。

    其中,第1步名为build阶段(建立哈希表),第2步名为probe阶段(探测哈希表)。相应地,小表R就称为build table,大表S就称为probe table。Hash join的I/O复杂度是O(|R| + |S|),效率很高,但是它受到两点限制:

    1. 小表必须保证能完全放入内存;
    2. 只适用于equi join,即仅包含"="条件的连接。

    如果小表不能完全放入内存,就只能分批加载,实质上就退化成了BNL join算法。为了避免这种退化,也有一个优化方案,即Grace hash join算法。它的思想也很直接:将R、S两张表都以join key散列到分区,然后对于划分到同一个分区的R、S分片数据分别再进行原生Hash join的build与probe过程(注意,分区与join两个阶段所用的哈希函数是不同的),将所有分片合并起来就是最终的join结果。

    由此可见,Grace hash join消灭了对S表的重复扫描,I/O复杂度为O[p(R) + p(S)],理论效率比BNL join高很多了。

    下图示出Grace hash join的过程。其名称中的Grace并非人名,而是首个采用这种算法的数据库系统的名字。

    MySQL中目前还没有Hash join的实现,但在Oracle的7.3版本之后引入了Hash join算法,专门用来优化大小表join的情况。在大数据领域中Hash join是绝对的标配,Spark SQL就充分利用了它,并且又分为两种情况:

    • Broadcast hash join
      如果build table足够小,小到在整个Spark集群内分发它的overhead(比如对带宽的占用)不会造成明显影响的话,就可以将build table数据经由Driver广播到所有Executor所在的节点,然后在每个Executor上分别执行Hash join,再汇总结果即可。小表能够被广播出去的大小阈值由Spark配置项spark.sql.autoBroadcastJoinThreshold指定,默认值为10MB。

    • Shuffle hash join
      如果build table仍然不大,但是已经超过了广播的阈值,就会采用此种方法。我们对Shuffle的概念已经非常熟悉,这里的Shuffle就是指将build table和probe table分别按key进行分区,并重新分布到各个节点的过程。Shuffle完成后,仍然是执行传统的Hash join。可见,这个思路其实就是Grace hash join的Spark SQL版本的实现。

    关于Spark SQL中join的实现方法,前人已经写过非常好的分析文章,参见hbasefly的博客。作为Spark重度用户,本来是想随着介绍join算法一起说两句的,想想还是不做重复工作了,毕竟时间宝贵得很。

    Sort-merge join

    从前面的介绍,看官应该可以知晓一点:NL/BNL join适用于小表与小表的连接,(Grace) Hash join适用于小表与大表的连接。那么两张大表的连接用什么方法比较好呢?答案就是Sort-merge join。

    实际上两张大表join完全可以用Grace hash join来做,但是Sort-merge join提供了另一种思路:它首先根据R和S的join key分别对两张表进行排序,然后同时遍历排序后的R和S,如果遇到了相同的key就输出,否则继续取下标较小的一方的数据。英文维基上提供了一份很好的伪码,如下:

     function sortMerge(relation left, relation right, attribute a)
         var relation output
         var list left_sorted := sort(left, a) // Relation left sorted on attribute a
         var list right_sorted := sort(right, a)
         var attribute left_key, right_key
         var set left_subset, right_subset // These sets discarded except where join predicate is satisfied
         advance(left_subset, left_sorted, left_key, a)
         advance(right_subset, right_sorted, right_key, a)
         while not empty(left_subset) and not empty(right_subset)
             if left_key = right_key // Join predicate satisfied
                 add cartesian product of left_subset and right_subset to output
                 advance(left_subset, left_sorted, left_key, a)
                 advance(right_subset, right_sorted, right_key, a)
             else if left_key < right_key
                advance(left_subset, left_sorted, left_key, a)
             else // left_key > right_key
                advance(right_subset, right_sorted, right_key, a)
         return output
    
     // Remove tuples from sorted to subset until the sorted[1].a value changes
     function advance(subset out, sorted inout, key out, a in)
         key := sorted[1].a
         subset := emptySet
         while not empty(sorted) and sorted[1].a = key
             insert sorted[1] into subset
             remove sorted[1]
    

    可见,Sort-merge join的时间主要消耗在了排序上,其I/O复杂度可以表示为O[p(R) + p(S) + p(R) · logp(R) + p(S) · logp(S)],或者渐近地简化为O[p(R) · logp(R) + p(S) · logp(S)]。它的复杂度比Grace hash join还要高,但是当join key已经有序或者基本有序时,它的威力就显现出来了。例如,如果在RDBMS中以两张表的主键作为join key,主键索引实际上就是有序的。

    再以Spark SQL为例,由于在Sort-merge join时会先进行Shuffle,而当前Spark采用的是Sort-based shuffle算法(关于其细节,可以参考我之前写的文章),所以实际上变相地省去了sort步骤,只剩下merge了。Spark SQL中的Sort-merge join实现如下图所示。

    仍然出自http://hbasefly.com/2017/03/19/sparksql-basic-join/

    Grace hash join与Sort-merge join的相同点在于都采用了分治(divide-and-conquer)的思想。前者是将数据散列成分片,然后分别处理;后者的归并排序算法天然地就是分治法的实现,不管内部归并还是外部归并都是如此。

    The End

    晚安晚安。

    相关文章

      网友评论

        本文标题:漫谈join算法

        本文链接:https://www.haomeiwen.com/subject/lgpbkctx.html