美文网首页大数据技术分享
Spark系列——关于 mapPartitions的误区

Spark系列——关于 mapPartitions的误区

作者: code_solve | 来源:发表于2019-09-25 18:21 被阅读0次

前言

今天 Review 了一下同事的代码,
发现其代码中有非常多的 mapPartitions,
问其原因,他说性能比 map 更好。
我说为什么性能好呢?
于是就有了这篇文章

网上推崇 mapPartitions 的原因

  • 执行次数变少,速度更快

    按照某些文章的原话来说
    一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。
    我想说的是:
    一次函数调用会处理一个partition所有的数据,
    确实是可以节省你调用函数的那微乎其微的时间开销,
    但是这个节省的时间真的太小了,
    尤其是对与spark这种框架,
    本身就不是用来做毫秒级响应的东西,
    甚至硬要扯的话,你引入迭代器,
    做迭代器的操作难道就不要消耗时间的么?

    如果说上面这种说法还有那么一丢丢靠谱的话,
    有些说法就真的让我很无语了,
    比如说:
    如果是普通的map,比如一个partition中有1万条数据;ok, 那么你的function要执行和计算1万次。 但是,使用MapPartitions操作之后,一个task仅仅会执行一次function, function一次接收所有的partition数据。只要执行一次就可以了,性能比较高
    这种说法如果按照上面的方式来理解其实也是那么一回事,
    但是也很容易让一些新人理解为:
    map要执行1万次,而 MapPartitions 只需要一次,这速度杠杠的提升了啊
    实际上,你使用MapPartitions迭代的时候,还是是一条条数据处理的,这个次数其实完全没变。

mapPartitions 带来的问题

其实就我个人经验来看,
mapPartitions 的正确使用其实并不会造成什么大的问题,
当然我也没看出普通场景 mapPartitions 比 map 有什么优势,
所以 完全没必要刻意使用 mapPartitions
反而,mapPartitions 会带来一些问题。

  1. 使用起来并不是很方便,这个写过代码的人应该都知道。
    当然这个问题并不是不能解决,我们可以写类似下面的代码,
    确实也变的和 map 简洁性也差不太多,
    恩,我不会告诉你可以尝试在生产环境中用用噢。
    //抽象出一个函数,以后所有的 mapPartitions 都可以用
    def mapFunc[T, U](iterator: Iterator[T], f: T => U) = {
      iterator.map(x => {
        f(x)
      })
    }
    //使用    
    rdd.mapPartitions(x => {
        mapFunc(x, line => {
            s"${line}转换数据"
        })
      })
    
    
  2. 容易造成 OOM,这个也是很多博客提到的问题,
    他们大致会写出如下的代码来做测试,
    rdd.mapPartitions(x => {
        xxxx操作
       while (x.hasNext){
         val next = x.next()
       }
        xxx操作
      })
    
    如果你的代码是上面那样,那OOM也就不足为奇了,
    不知道你注意到了没有,mapPartitions 是接受一个迭代器,
    再返回一个迭代器的,
    如果你这么写代码,就完全没有使用到迭代器的懒执行特性。
    将数据都堆积到了内存,
    真就变成了一次处理一个partition的数据了,
    在某种程度上已经破坏了 Spark Pipeline 的计算模式了。

mapPartitions 到底该怎么用

存在即是道理,
虽然上面一直在吐槽,
但是其确实有存在的理由。
其一个分区只会被调用一次的特性,
在一些写数据库的时候确实很有帮助,
因为我们的 Spark 是分布式执行的,
所以连接数据库的操作必须放到算子内部才能正确的被Executor执行,
那么 mapPartitions 就显示比 map 要有优势的多了。
比如下面这段伪代码

rdd.mapPartitions(x => {
        println("连接数据库")
        val res = x.map(line=>{
          print("写入数据:" + line)
          line
        })
        println("断开数据库")
        res
      })

这样我就一个分区只要连接一次数据库,
而如果是 map 算子,那可能就要连接 n 多次了。

另外一点就是 mapPartitions 提供给了我们更加强大的数据控制力,
怎么理解呢?我们可以一次拿到一个分区的数据,
那么我们就可以对一个分区的数据进行统一处理,
虽然会加大内存的开销,但是在某些场景下还是很有用的,
比如一些矩阵的乘法。

后记

不管你要使用哪个算子,其实都是可以的,
但是大多数时候,我还是推荐你使用 map 算子,
当然遇到一些map算子不合适的场景,
那就没办法了...
不过就算你是真的要使用 mapPartitions,
那么请记得充分发挥一下 迭代器的 懒执行特性。

最后,如果本文对你有帮助,帮忙点个赞呗

相关文章

  • Spark系列——关于 mapPartitions的误区

    前言 今天 Review 了一下同事的代码,发现其代码中有非常多的 mapPartitions,问其原因,他说性能...

  • spark优化技巧(四)

    算子调优 4.1 MapPartitions提升Map类操作性能 spark中,最基本的原则,就是每个task处理...

  • Spark调优(4—6)

    4、算子调优 4.1、MapPartitions提升Map类操作性能 spark中,最基本的原则,就是每个task...

  • Spark学习(六):map & mapPartitions

    对于一些没有用过的函数或者操作,看文字描述总是觉得很晦涩,很难理解,所以学习的时候我比较倾向于先从小例程入手,以便...

  • mapPartitions 使用

    Spark中的map函数是将每个rdd都进行自定义函数处理mapPartitions则是将多个rdd进行分区,对每...

  • spark第一天

    map方法 fillter flatmap mapPartitions mapPartitions是map的一个变...

  • 【Spark Java API】Transformation(1

    mapPartitions 官方文档描述: **mapPartitions函数会对每个分区依次调用分区函数处理,然...

  • mapPartitions

    mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartition...

  • Spark

    Spark 系列:『 Spark 』1. spark 简介 things you need know before...

  • Spark基础系列之一--Spark是什么

    传送门Spark实战系列之一--Spark是什么Spark实战系列之二--什么是RDD以及RDD的常用APISpa...

网友评论

    本文标题:Spark系列——关于 mapPartitions的误区

    本文链接:https://www.haomeiwen.com/subject/xzlwuctx.html