美文网首页
spark的wordcount创建了几个RDD

spark的wordcount创建了几个RDD

作者: 越走越远的风 | 来源:发表于2019-04-09 15:21 被阅读0次

    wordcount代码很简单,先贴出来

    val conf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[4]")
        val sc = new SparkContext(conf)
    
        val lines: RDD[String] = sc.textFile("C:\\Users\\Desktop\\word.txt")
        //切分压平
        val words: RDD[String] = lines.flatMap(_.split(" "))
        //将单词和一组合
        val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
        //按key进行聚合
        val reduced:RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
        //排序
        //val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
        //将结果保存到HDFS中
        reduced.saveAsTextFile(args(1))
    //    sorted.collect().foreach{println}
        //释放资源
        sc.stop()
    

    1.我们逐一来看,首先是sc.textFile,源码如下

    /**
       * Read a text file from HDFS, a local file system (available on all nodes), or any
       * Hadoop-supported file system URI, and return it as an RDD of Strings.
       * @param path path to the text file on a supported file system
       * @param minPartitions suggested minimum number of partitions for the resulting RDD
       * @return RDD of lines of the text file
       */
      def textFile(
          path: String,
          minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
        assertNotStopped()
        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
          minPartitions).map(pair => pair._2.toString).setName(path)
      }
    
     def hadoopFile[K, V](
          path: String,
          inputFormatClass: Class[_ <: InputFormat[K, V]],
          keyClass: Class[K],
          valueClass: Class[V],
          minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
        assertNotStopped()
    
        // This is a hack to enforce loading hdfs-site.xml.
        // See SPARK-11227 for details.
        FileSystem.getLocal(hadoopConfiguration)
    
        // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
        val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
        val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
        new HadoopRDD(
          this,
          confBroadcast,
          Some(setInputPathsFunc),
          inputFormatClass,
          keyClass,
          valueClass,
          minPartitions).setName(path)
      }
    
    @DeveloperApi
    class HadoopRDD[K, V](
        sc: SparkContext,
        broadcastedConf: Broadcast[SerializableConfiguration],
        initLocalJobConfFuncOpt: Option[JobConf => Unit],
        inputFormatClass: Class[_ <: InputFormat[K, V]],
        keyClass: Class[K],
        valueClass: Class[V],
        minPartitions: Int)
      extends RDD[(K, V)](sc, Nil) with Logging{
      ...(此处省略)
    }
    

    由此可以看出textFile方法内部生成了一个HadoopRDD,格式为K,V,然后进行了map操作,即上图textFile方法中的
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions)
    .map(pair => pair._2.toString).setName(path)
    这个pair 就是HadoopRDD的KV数据集,k为偏移量,v在这里为该行数据

    2.map方法也会返回一个新的RDD,我们看下map方法源码

    // Transformations (return a new RDD)
    
      /**
       * Return a new RDD by applying a function to all elements of this RDD.
       */
      def map[U: ClassTag](f: T => U): RDD[U] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
      }
    

    这里返回的MapPartitionsRDD即lines: RDD[String],因此该步骤产生了两个RDD。
    下面我们看flatMap方法

    /**
       *  Return a new RDD by first applying a function to all elements of this
       *  RDD, and then flattening the results.
       */
      def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
      }
    

    这里返回了一个新的RDD。
    3.下面是words.map((_, 1))方法,这里调用map方法生成一个新的RDD。

    4.接下来看reduceByKey方法

    /**
       * Merge the values for each key using an associative and commutative reduce function. This will
       * also perform the merging locally on each mapper before sending results to a reducer, similarly
       * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
       * parallelism level.
       */
      def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
        reduceByKey(defaultPartitioner(self), func)
      }
    
    
    /**
       * Merge the values for each key using an associative and commutative reduce function. This will
       * also perform the merging locally on each mapper before sending results to a reducer, similarly
       * to a "combiner" in MapReduce.
       */
      def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
        combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
      }
    
    
     @Experimental
      def combineByKeyWithClassTag[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          partitioner: Partitioner,
          mapSideCombine: Boolean = true,
          serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
        require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
        if (keyClass.isArray) {
          if (mapSideCombine) {
            throw new SparkException("Cannot use map-side combining with array keys.")
          }
          if (partitioner.isInstanceOf[HashPartitioner]) {
            throw new SparkException("HashPartitioner cannot partition array keys.")
          }
        }
        val aggregator = new Aggregator[K, V, C](
          self.context.clean(createCombiner),
          self.context.clean(mergeValue),
          self.context.clean(mergeCombiners))
        if (self.partitioner == Some(partitioner)) {
          self.mapPartitions(iter => {
            val context = TaskContext.get()
            new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
          }, preservesPartitioning = true)
        } else {
          new ShuffledRDD[K, V, C](self, partitioner)
            .setSerializer(serializer)
            .setAggregator(aggregator)
            .setMapSideCombine(mapSideCombine)
        }
      }
    

    可以看到,这里生成了一个新的ShuffledRDD

    5.saveAsTextFile方法

    def saveAsTextFile(path: String): Unit = withScope {
        // https://issues.apache.org/jira/browse/SPARK-2075
        //
        // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
        // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
        // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
        // Ordering for `NullWritable`. That's why the compiler will generate different anonymous
        // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
        //
        // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
        // same bytecodes for `saveAsTextFile`.
        val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
        val textClassTag = implicitly[ClassTag[Text]]
        val r = this.mapPartitions { iter =>
          val text = new Text()
          iter.map { x =>
            text.set(x.toString)
            (NullWritable.get(), text)
          }
        }
        RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
          .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
      }
    

    这里调用了mapPartitions方法,该方法会返回一个新的RDD

    /**
       * Return a new RDD by applying a function to each partition of this RDD.
       *
       * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
       * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
       */
      def mapPartitions[U: ClassTag](
          f: Iterator[T] => Iterator[U],
          preservesPartitioning: Boolean = false): RDD[U] = withScope {
        val cleanedF = sc.clean(f)
        new MapPartitionsRDD(
          this,
          (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
          preservesPartitioning)
      }
    

    流程图如下:


    SparkWordCount执行过程.png

    由上面可见总共生成了6个RDD。

    相关文章

      网友评论

          本文标题:spark的wordcount创建了几个RDD

          本文链接:https://www.haomeiwen.com/subject/iusbiqtx.html