Spark09 RDD分区与并行调优

作者: 山高月更阔 | 来源:发表于2020-06-14 18:05 被阅读0次

向 Spark 传递函数

在 Python 中如果函数比较短可以用 lambda 方式,当然也可以用传递函数
如:

word = rdd.filter(lambda s : 'error' in s)
def containsError(s):
    return 'error' in s
word = radd.filter(containsError)

在 Spark 中我们知道 'error' in s 这段代码是在 executor 中执行的,函数的定义是在 Driver 中。所以 Spark 会把 containsError 这个函数发送到 executor 中。所以如果 containsError 函数调用了其他变量 函数 类 等 都会经过序列化后发送到 executor 中。 lambada 也是一样的,lambda 不过就是不需要定义函数而已,可以理解为 lambda 是一个匿名函数。比如:

class SearchFunctions(object):
    def __init__(self, query):
        self.query = query
    def isMatch(self, s):
        return self.query in s
    def getMachesMemberReference(self, rdd):
        return rdd.filter(lambda x : self.query in x)
    def getMachesFunctionsReference(self, rdd):
        return rdd.filter(self.isMatch)

上述代码在执行是会报错。原因在于 Spark 向 executor 发送函数是发现有self.query 含有 SearchFunctions 的引用字段。 就需要把SearchFunctions 类实例的对象序列化后全部发送到 executor 中去。在 Python 中 对象序列化会报错。所以上述代码会执行报错。
在 Java 或 Scala 中可以让类实现 Serializable 接口 就可以将对象序列化。当然不是所有对象都是可以序列化的。比如 数据库连接,网络通道等。所以在实际中尽量避免将对象序列化传到 executor 中。上述例子可以做如下修改

 def getMachesMemberReference(self, rdd):
        query = self.query
        return rdd.filter(lambda x : query in x) # 该方式不会有类的引用字段 就不需要将对象序列化传递到 executor 中去

数据分区

在执行 RDD 的算子是会把数据分成多组发送到不同 executor 中执行 比如

rdd.filter(lambda x : len(x) > 0) #默认分区 并行执行

如果需要指定分区数用 repartition 算子

rdd.repartition(10).filter(lambda x : len(x) > 0)  # 将数据分为10分区

repartition 算子他会把数据通过网络进行混淆,并创建出新的分区集合。对数据进行重新分区往往代价比较大。

分区并不是对所有的应用都有好处,比如给定的 RDD 只需要扫描一次,就完全没必要分区。

Spark 中所有的键值对 RDD 都可以进行分区。系统会根据一个针对键值对的函数对元素进行分区。尽管 Spark 没有给出显示控制每个键具体落在哪一个 executor 节点上的方法,但 Spark 可以确保同一组的键出现在同一个节点上。比如,可以使用哈希分区将一个 RDD 分成100分分区,此时键的哈希值对100取模的结果相同的记录会被放在一个节点上。

举例:有一张很大的用户信息表,由(UserId, UserInfo)组成的RDD。有另外一张表(UserId, LinkInfo)对组成的表。存放过去5分钟内某网站各个用户访问的情况。我们要统计用户访问情况,可以使用 Spark join 方法 关联 UserInfo 和 LinkInfo 的组合。简单代码如下:
Python

userDataRdd =  #通过文件或其他方式加载数据 结构为(UserId, UserInfo)
def processNewLog():
   eventsRdd = #通过文件或其他方式加载过去5分钟用户行为数据 结构为(UserId, LinkInfo)
   userAndLinkInfoRdd = userDataRdd.join(eventsRdd)
   # 可继续对userAndLinkInfoRdd 进行 map filter 等操作  获取需要的数据

上述代码能正确运行但效率不高。每次调用 processNewLog 是都会进行 join 操作。我们对数据集如何分区一无所知,默认情况下,连接操作会将两个数据集中的所有键的 Hash 值都求出来,将该 Hash 值相同的记录通过网络传同一台机器上,然后在那台机器上对所有键相同的记录进行连接。因为 UserInfo 表比 5 分钟一次的 eventsRdd 要大的多,所以会浪费时间做很多无用工作,在每次调用都会对 UserInfo 表进行 Hash 值计算和夸节点数据混洗,虽然这些数据从来不会变化。

要解决这一问题也很简单。在程序开始时对 UserInfo 做 partitionBy 操作

userDataRdd = userDataRdd.partitionBy(100)

processNewLog 方法保持不变。在 Java 或 Scala 中需要将 HashPartitioner 传入partitionBy方法 比如:

userDataRdd = userDataRdd.partitionBy(new HashPartitioner(100))

eventsRdd 数据仅使用一次 所以指定分区没有什么用处。由于 userDataRdd 调用了 partitionBy,Spark 就知道了该 RDD 是根据键的哈希值来分区的。这样在调用 join时,Spark 会利用到这一点。当调用 userDataRdd.jonin(eventsRdd) 时,Spark 只会对 eventsRdd 进行数据混洗操作,将 eventsRdd 中特定 UserId 的记录发送到 UserDataRdd 的对应分区所在的那台机器上,这样需要的网络传输数据就会大大减少,程序运行效率也显著提高了。
注意 partitionBy 是返回一个新的 RDD 不是将原 RDD 数据进行修改。
其他一些算子也会利用到以后的分区信息 如 sortByKey 和 groupByKey,另一方面 诸如 map 等操作会导致新的 RDD 失去父 RDD 的分区 信息。

在 Spark glom 算子 partitioner 算子可以查看 RDD 的分区情况

能从分区中获益的操作 cogroup、groupWith、 join、 leftOuterJoin、 rightOuterJoin、groupByKey、reduceByKey、combineByKey以及lookup

影响分区方式的操作

Spark 内部知道各种操作如何影响分区方式,并将会对数据进行分区的操作的结果RDD 自动设置为对应的分区器。比如 Join 连接两个 RDD; 由于键相同的元素会被哈希到同一台机器上, Spark 知道输出结果也是 Hash 分区的,这样对连接结果进行诸如 reduceByKey 这样操作时就会明显变快。

不过转换算子就不一定了 比如 map 理论上 map 是可以修改key 的。因此结果就不会有固有分区。不过Spark 提供的 mapValues和flatMapValues。它们可以保证二元组的键保持不变。

这里列出会为生成结果 RDD 设好分区方式的操作 cogroup、 groupWith 、join 、leftOuterJoin、 rightOuterJoin、 groupByKey、 reduceByKey、 combineByKey 、partitionBy、 sort、 mapValues(如果父RDD有分区)、 flatMapValues(如果父RDD有分区)、 filter(如果父RDD有分区)

相关文章

  • Spark09 RDD分区与并行调优

    向 Spark 传递函数 在 Python 中如果函数比较短可以用 lambda 方式,当然也可以用传递函数如: ...

  • SPARK[RDD之partitions]

    RDD是容错、并行的数据结构,具备分区的属性,这个分区可以是单机上分区也可以是多机上的分区,对于RDD分区的数量涉...

  • 2019-03-16 spark 分区

    spark 分区与性能。 RDD 的分区并行运行1个并发的任务,达到CPU的数量或者2-3倍。 比较好的分区是ex...

  • Flink 程序执行调优(持续更新中...)

    1. 消费kafka的调优 关于并行度消费kafka的并行度我们都知道source端保持跟kafka的分区一致,因...

  • 宽依赖和窄依赖

    窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据...

  • Spark core RDD API

    1. RDD 的概述 1.1 RDD 的优势 高效容错 可以控制数据的分区来优化计算性能 并行处理 提供了丰富的操...

  • Spark中repartition和coalesce的用法

    在Spark的Rdd中,Rdd是分区的。 有时候需要重新设置Rdd的分区数量,比如Rdd的分区中,Rdd分区比较多...

  • 《Spark快速大数据分析》读书笔记——并行度

    并行度一个RDD针对多个数据分区会并行运行同样的计算。并行度是并行执行的任务数量,而不是切分任务的数量。 并行度推...

  • Spark_day04

    RDD的 Shuffle 和 分区 RDD的分区操作2.Shuffle 的原理 分区的作用 RDD 使用分区来分布...

  • Spark的优化

    1.RDD重新分区 针对大量小分区的RDD,使用RDD重分区函数coalesce将小分区合并成大分区;同样当分区数...

网友评论

    本文标题:Spark09 RDD分区与并行调优

    本文链接:https://www.haomeiwen.com/subject/eahrzhtx.html