美文网首页Spark深入学习
【Spark Java API】Transformation(5

【Spark Java API】Transformation(5

作者: 小飞_侠_kobe | 来源:发表于2016-02-05 14:34 被阅读485次

    cartesian


    官方文档描述:

    Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in `this` and b is in `other`.
    

    函数原型:

    def cartesian[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U]
    

    源码分析:

     def getPartitions: Array[Partition] = {  
    // create the cross product split  
    val array = new Array[Partition(rdd1.partitions.length * rdd2.partitions.length)  
    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {    
        val idx = s1.index * numPartitionsInRdd2 + s2.index    
        array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)  
      }  array
    }
    
    def getDependencies: Seq[Dependency[_]] = List(  
    new NarrowDependency(rdd1) {    
      def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2) 
     },  
    new NarrowDependency(rdd2) {    
      def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)  
    }
    )
    

    **
    Cartesian 对两个 RDD 做笛卡尔集,生成的 CartesianRDD 中 partition 个数 = partitionNum(RDD a) * partitionNum(RDD b)。从getDependencies分析可知,这里的依赖关系与前面的不太一样,CartesianRDD中每个partition依赖两个parent RDD,而且其中每个 partition 完全依赖(NarrowDependency) RDD a 中一个 partition,同时又完全依赖(NarrowDependency) RDD b 中另一个 partition。具体如下CartesianRDD 中的 partiton i 依赖于 (RDD a).List(i / numPartitionsInRDDb) 和 (RDD b).List(i % numPartitionsInRDDb)。
    **

    实例:

    
    List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
    JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
    
    JavaPairRDD<Integer,Integer> cartesianRDD = javaRDD.cartesian(javaRDD);
    System.out.println(cartesianRDD.collect());
    

    distinct


    官方文档描述:

    Return a new RDD containing the distinct elements in this RDD.
    

    函数原型:

    def distinct(): JavaRDD[T]
    
    def distinct(numPartitions: Int): JavaRDD[T]
    

    **
    第一个函数是基于第二函数实现的,只是numPartitions默认为partitions.length,partitions为parent RDD的分区。
    **

    源码分析:

    def distinct(): RDD[T] = withScope {  distinct(partitions.length)}
    
    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {  
      map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
    }
    

    **
    distinct() 功能是 deduplicate RDD 中的所有的重复数据。由于重复数据可能分散在不同的 partition 里面,因此需要 shuffle 来进行 aggregate 后再去重。然而,shuffle 要求数据类型是 <K, V> 。如果原始数据只有 Key(比如例子中 record 只有一个整数),那么需要补充成 <K, null> 。这个补充过程由 map() 操作完成,生成 MappedRDD。然后调用上面的 reduceByKey() 来进行 shuffle,在 map 端进行 combine,然后 reduce 进一步去重,生成 MapPartitionsRDD。最后,将 <K, null> 还原成 K,仍然由 map() 完成,生成 MappedRDD。
    **

    实例:

    List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2);
    JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
    
    JavaRDD<Integer> distinctRDD1 = javaRDD.distinct();
    System.out.println(distinctRDD1.collect());
    JavaRDD<Integer> distinctRDD2 = javaRDD.distinct(2);
    System.out.println(distinctRDD2.collect());
    

    相关文章

      网友评论

        本文标题:【Spark Java API】Transformation(5

        本文链接:https://www.haomeiwen.com/subject/zioekttx.html