美文网首页spark
spark算子1:repartitionAndSortWithi

spark算子1:repartitionAndSortWithi

作者: 糖哗啦 | 来源:发表于2018-08-23 17:35 被阅读455次

    repartitionAndSortWithinPartitions算是一个高效的算子,是因为它要比使用repartition And sortByKey 效率高,这是由于它的排序是在shuffle过程中进行,一边shuffle,一边排序;具体见spark shuffle的读操作
    关于为什么比repartition And sortByKey 效率高,首先简要分析repartition 和sortbykey'的流程:

    (1)rePartition
    (2)sortByKey

    repartitionAndSortWithinPartitions的使用

    (1)使用repartitionAndSortWithinPartitions时,需要自己传入一个分区器参数,这个分区器 可以是系统提供的,也可以是自定义的:例如以下Demo中使用的KeyBasePartitioner,同时需要自定义一个排序的隐式变量,当我们使用repartitionAndSortWithinPartitions时,我们自定义的my_self_Ordering 排序规则就会传入到def implicitly[T](implicit e: T) = e
    (2)二次排序
    排序规则都需要在自定义的隐式变量my_self_Ordering中实现

    private val ordering = implicitly[Ordering[K]]
    //这里是使用了上下文界定,这个T就是Ordering[K]
    def implicitly[T](implicit e: T) = e 
    def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
        new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
      }
    
    Demo案例
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[4]")
        val sc = new SparkContext(sparkConf)
        val wordsRDD: RDD[String] = sc.textFile("D:\\Spark_数据\\numbers_data.txt")
        val resultRDD = wordsRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(each => (each._2, each._1))
    /**
          * key怎么排序,在这里定义
          * 为什么在这里声明一个隐式变量呢,是因为在源码中,方法中有一个隐式参数;不设置是按照默认的排序规则进行排序;
          */
        implicit val my_self_Ordering = new Ordering[String] {
          override def compare(a: String, b: String): Int = {
            val a_b: Array[String] = a.split("_")
            val a_1 = a_b(0).toInt
            val a_2 = a_b(1).toInt
            val b_b = b.split("_")
            val b_1 = b_b(0).toInt
            val b_2 = b_b(1).toInt
            if (a_1 == b_1) {
              a_2 - b_2
            } else {
              a_1 - b_1
            }
          }
        }
    
    val rdd = resultRDD.map(x => (x._1 + "_" + x._2, x._2)).repartitionAndSortWithinPartitions(new KeyBasePartitioner(2))
    /**
        * 自定义分区器
        *
        * @param partitions
        */
      class KeyBasePartitioner(partitions: Int) extends Partitioner {
         //分区数
        override def numPartitions: Int = partitions
        //该方法决定了你的数据被分到那个分区里面
        override def getPartition(key: Any): Int = {
          val k = key.asInstanceOf[String]
          Math.abs(k.hashCode() % numPartitions)
        }
      }
    
    

    相关文章

      网友评论

        本文标题:spark算子1:repartitionAndSortWithi

        本文链接:https://www.haomeiwen.com/subject/wslpiftx.html