美文网首页
Spark 存储行动算子源码解析

Spark 存储行动算子源码解析

作者: Tim在路上 | 来源:发表于2022-02-09 08:27 被阅读0次
    • saveAsHadoopFile

    输出RDD到任何支持Hadoop的文件系统

    def saveAsHadoopFile(
        path: String,
        keyClass: Class[_],
        valueClass: Class[_],
        outputFormatClass: Class[_ <: OutputFormat[_, _]],
        conf: JobConf = new JobConf(self.context.hadoopConfiguration),
        codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
      // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
      // 1. 配置hadoopConf 将key,value,output类型进行设置
      val hadoopConf = conf
      hadoopConf.setOutputKeyClass(keyClass)
      hadoopConf.setOutputValueClass(valueClass)
      conf.setOutputFormat(outputFormatClass)
      // 配置压缩
      for (c <- codec) {
        hadoopConf.setCompressMapOutput(true)
        hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
        hadoopConf.setMapOutputCompressorClass(c)
        hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)
        hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",
          CompressionType.BLOCK.toString)
      }
      // 配置output的committer
      // Use configured output committer if already set
      if (conf.getOutputCommitter == null) {
        hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
      }
    
      // When speculation is on and output committer class name contains "Direct", we should warn
      // users that they may loss data if they are using a direct output committer.
      val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
      val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
      if (speculationEnabled && outputCommitterClass.contains("Direct")) {
        val warningMessage =
          s"$outputCommitterClass may be an output committer that writes data directly to " +
            "the final location. Because speculation is enabled, this output committer may " +
            "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
            "committer that does not have this behavior (e.g. FileOutputCommitter)."
        logWarning(warningMessage)
      }
    
      FileOutputFormat.setOutputPath(hadoopConf,
        SparkHadoopWriterUtils.createPathFromString(path, hadoopConf))
      // 调用saveAsHadoopDataset
      saveAsHadoopDataset(hadoopConf)
    }
    

    从源码可以看出saveAsHadoopFile的输入参数有path, key类型,value类型, 输出格式类型,hadoop配置,压缩类型。将输入的参数配置到JobConf中后,调用saveAsHadoopDataset。

    def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
      val config = new HadoopMapRedWriteConfigUtil[K, V](new SerializableJobConf(conf))
      SparkHadoopWriter.write(
        rdd = self,
        config = config)
    }
    

    源码调用了SparkHadoopWriter.write方法。

    def write[K, V: ClassTag](
        rdd: RDD[(K, V)],
        config: HadoopWriteConfigUtil[K, V]): Unit = {
      // Extract context and configuration from RDD.
      val sparkContext = rdd.context
      val commitJobId = rdd.id
    
    // Set up a job.  准备和创建一个commiter Job
      val jobTrackerId =createJobTrackerID(new Date())
      val jobContext = config.createJobContext(jobTrackerId, commitJobId)
      config.initOutputFormat(jobContext)
    
      // Assert the output format/key/value class is set in JobConf.
      config.assertConf(jobContext, rdd.conf)
    
      val committer = config.createCommitter(commitJobId)
      committer.setupJob(jobContext)
    
      // Try to write all RDD partitions as a Hadoop OutputFormat.
      try {
        val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
          // SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.
          // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
          val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber
    提交到Task端执行
    executeTask(
            context = context,
            config = config,
            jobTrackerId = jobTrackerId,
            commitJobId = commitJobId,
            sparkPartitionId = context.partitionId,
            sparkAttemptNumber = attemptId,
            committer = committer,
            iterator = iter)
        })
        // 提交job
        committer.commitJob(jobContext, ret)
        logInfo(s"Job${jobContext.getJobID} committed.")
      } catch {
        case cause: Throwable =>
          logError(s"Aborting job${jobContext.getJobID}.", cause)
          committer.abortJob(jobContext)
          throw new SparkException("Job aborted.", cause)
      }
    }
    

    其主要工作为,在Driver端为作业准备数据源和Hadoop的配置,提交一个Job, 并向RDD的每一个分区传入executeTask作为执行,其任务将每一个分区中的所有行进行写出。如果所有的分区task都成功写出,提交commitTask,则提交committer, 否则存在失败则终止。

    saveAsHadoopFile 还存在一些简化版本,参数的传递时通过程序自己获取。

    def saveAsHadoopFile[F <: OutputFormat[K, V]](
        path: String,
        codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope {
      val runtimeClass = fm.runtimeClass
      saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
    }
    

    从中可以看出我们只需传入存储路径。

    private[spark] def keyClass: Class[_] = kt.runtimeClass
    
    private[spark] def valueClass: Class[_] = vt.runtimeClass
    

    keyClass和valueClass都是运行时转换获取。

    • saveAsTextFile

    将RDD存储的支持hadoop系统上的文本文件,以string形式存储,它也是saveAsHadoopFile的简化版。

    def saveAsTextFile(path: String): Unit = withScope {
      // same bytecodes for `saveAsTextFile`.
      val nullWritableClassTag =implicitly[ClassTag[NullWritable]]
      val textClassTag =implicitly[ClassTag[Text]]
      val r = this.mapPartitions { iter =>
        val text = new Text()
        iter.map { x =>
          text.set(x.toString)
          (NullWritable.get(), text)
        }
      }
      RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
        .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
    }
    
    • 其他大数据系统

    从上文可以看出,在调用saveAsHadoopDataset时,传入的参数为Jobconf类型,实质是在其中配置相关类型,通过配置JobConf也可以实现其他系统的存储。

    object HBaseWriteTest {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
        val sc = new SparkContext(sparkConf)
    
        val tableName = "XXX"
        val quorum = "localhost"
        val port = "2181"
    
        // 配置相关信息
        val conf = HBaseUtils.getHBaseConfiguration(quorum,port,tableName)
        conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
    
        val jobConf = new JobConf()
        jobConf.setOutputFormat(classOf[TableOutputFormat])
        jobConf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
    
        // 写入数据到HBase
        val indataRDD = sc.makeRDD(Array("20180723_02,10","20180723_03,10","20180818_03,50"))
    
        val rdd = indataRDD.map(_.split(",")).map{arr => {
          val put = new Put(Bytes.toBytes(arr(0)))
          put.add(Bytes.toBytes("info"),Bytes.toBytes("clict_count"),Bytes.toBytes(arr(1)))
          (new ImmutableBytesWritable,put)
        }}.saveAsHadoopDataset(jobConf)
    
        sc.stop()
      }
    }
    

    相关文章

      网友评论

          本文标题:Spark 存储行动算子源码解析

          本文链接:https://www.haomeiwen.com/subject/uaubkrtx.html