美文网首页
spark flatMap 使用

spark flatMap 使用

作者: pcqlegend | 来源:发表于2018-05-25 16:41 被阅读0次

    /**

    • Return a new RDD by first applying a function to all elements of this
    • RDD, and then flattening the results.
      */
      def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
      }

    对这个RDD内的所有元素调用函数然后 并打平list 返回一个新的RDD
    需要注意的是 这个操作必须是集合类的RDD

    scala> val list = sc.parallelize(List(1,2,3,4,5,6))
    scala> list.flatMap(x =>x ).foreach(println(_))
    <console>:27: error: type mismatch;
     found   : Int
     required: TraversableOnce[?]
           list.flatMap(x =>x ).foreach(println(_))
                            ^
    
    

    有时候我们可能只想把List类型的RDD打平,而不像做map操作可以直接

    scala> val list2 = sc.parallelize(List(List(1,2,3,4,5,6),List(2,3,4)))
    list2: org.apache.spark.rdd.RDD[List[Int]] = ParallelCollectionRDD[1] at parallelize at <console>:24
    
    scala> list2.flatMap(x=>x).foreach(println(_))
    [Stage 0:>                                                          (0 + 0) / 8]1
    2
    3
    4
    5
    6
    2
    3
    4
                                     
    

    注意flatMap 里面 x=>x 其实这个x并不是你的list中的每一个元素,而是你RDD的中的每个list。也就是List(1,2,3,4,5,6) 或 List(2,3,4)

    scala> list2.flatMap(x=>{println(x); x}).foreach(item => println("haha"))
    List(2, 3, 4)
    List(1, 2, 3, 4, 5, 6)
    haha
    haha
    haha
    haha
    haha
    haha
    haha
    haha
    haha
    

    所以说如果需要做map操作的话需要这样

    scala> list2.flatMap(x=> {for(a<-x ) yield {a +1}}).foreach(item => println(item))
    2
    3
    4
    5
    6
    7
    3
    4
    5
    
    

    相关文章

      网友评论

          本文标题:spark flatMap 使用

          本文链接:https://www.haomeiwen.com/subject/cfrujftx.html