美文网首页Spark 应用spark
【Spark Java API】Action(6)—saveAs

【Spark Java API】Action(6)—saveAs

作者: 小飞_侠_kobe | 来源:发表于2016-08-22 12:11 被阅读1530次

    saveAsTextFile


    官方文档描述:

    Save this RDD as a text file, using string representations of elements.
    

    函数原型:

    def saveAsTextFile(path: String): Unit
    def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
    

    saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。

    源码分析:

    def saveAsTextFile(path: String): Unit = withScope {  
      // https://issues.apache.org/jira/browse/SPARK-2075  //  
      // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit  
      // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`  
      // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an  
      // Ordering for `NullWritable`. That's why the compiler will generate different anonymous  
      // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.  
      //  
      // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate  
      // same bytecodes for `saveAsTextFile`.  
      val nullWritableClassTag = implicitly[ClassTag[NullWritable]]  
      val textClassTag = implicitly[ClassTag[Text]]  
      val r = this.mapPartitions { iter =>    
        val text = new Text()    
        iter.map { x =>      
          text.set(x.toString)      
          (NullWritable.get(), text)    
        }  
      }  
      RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)    
        .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
    }
    /** 
    * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class 
    * supporting the key and value types K and V in this RDD. 
    */
    def saveAsHadoopFile(    
          path: String,    
          keyClass: Class[_],    
          valueClass: Class[_],    
          outputFormatClass: Class[_ <: OutputFormat[_, _]],    
          conf: JobConf = new JobConf(self.context.hadoopConfiguration),    
          codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {  
      // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).  
      val hadoopConf = conf  
      hadoopConf.setOutputKeyClass(keyClass)  
      hadoopConf.setOutputValueClass(valueClass)  
      // Doesn't work in Scala 2.9 due to what may be a generics bug  
      // TODO: Should we uncomment this for Scala 2.10?  
      // conf.setOutputFormat(outputFormatClass)  
      hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)  
      for (c <- codec) {    
        hadoopConf.setCompressMapOutput(true)    
        hadoopConf.set("mapred.output.compress", "true")    
        hadoopConf.setMapOutputCompressorClass(c)    
        hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)    
        hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)  
       }  
       // Use configured output committer if already set  
       if (conf.getOutputCommitter == null) {    
          hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])  
       }  
      FileOutputFormat.setOutputPath(hadoopConf,   
        SparkHadoopWriter.createPathFromString(path, hadoopConf))  
      saveAsHadoopDataset(hadoopConf)
    }
    
    /** 
    * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for 
    * that storage system. The JobConf should set an OutputFormat and any output paths required 
    * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop 
    * MapReduce job. 
    */
    def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {  
      // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).  
      val hadoopConf = conf  
      val wrappedConf = new SerializableConfiguration(hadoopConf)  
      val outputFormatInstance = hadoopConf.getOutputFormat  
      val keyClass = hadoopConf.getOutputKeyClass  
      val valueClass = hadoopConf.getOutputValueClass  
      if (outputFormatInstance == null) {    
        throw new SparkException("Output format class not set")  
      }  
      if (keyClass == null) {    
        throw new SparkException("Output key class not set")  
      }  
      if (valueClass == null) {    
        throw new SparkException("Output value class not set")  
      }  
      SparkHadoopUtil.get.addCredentials(hadoopConf)  
      logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +    valueClass.getSimpleName + ")")  
      if (isOutputSpecValidationEnabled) {    
        // FileOutputFormat ignores the filesystem parameter    
        val ignoredFs = FileSystem.get(hadoopConf)    
        hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)  
      }  
      val writer = new SparkHadoopWriter(hadoopConf)  
      writer.preSetup()  
      val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {    
        val config = wrappedConf.value    
        // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it    
        // around by taking a mod. We expect that no task will be attempted 2 billion times.    
        val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt    
        val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)    writer.setup(context.stageId, context.partitionId, taskAttemptId)    
        writer.open()    
        var recordsWritten = 0L    
        Utils.tryWithSafeFinally {      
          while (iter.hasNext) {        
            val record = iter.next()        
            writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])        
            // Update bytes written metric every few records        
            maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)        
            recordsWritten += 1      
      }    
    } {      
      writer.close()    
    }    
      writer.commit()    
      bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }    
      outputMetrics.setRecordsWritten(recordsWritten)  }  
      self.context.runJob(self, writeToFile)  
      writer.commitJob()
    }
    
    

    从源码中可以看到,saveAsTextFile函数是依赖于saveAsHadoopFile函数,由于saveAsHadoopFile函数接受PairRDD,所以在saveAsTextFile函数中利用rddToPairRDDFunctions函数转化为(NullWritable,Text)类型的RDD,然后通过saveAsHadoopFile函数实现相应的写操作。

    实例:

    List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
    JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);
    javaRDD.saveAsTextFile("/user/tmp");
    

    savaAsObjectFile


    官方文档描述:

    Save this RDD as a SequenceFile of serialized objects.
    

    函数原型:

    def saveAsObjectFile(path: String): Unit
    

    saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。

    源码分析:

    def saveAsObjectFile(path: String): Unit = withScope {  
      this.mapPartitions(iter => iter.grouped(10).map(_.toArray))    
        .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))    
        .saveAsSequenceFile(path)
    }
    
    def saveAsSequenceFile(    
        path: String,    
        codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {  
      def anyToWritable[U <% Writable](u: U): Writable = u  
      // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and  
      // valueWritableClass at the compile time. To implement that, we need to add type parameters to  
      // SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a  
      // breaking change.  
      val convertKey = self.keyClass != keyWritableClass  
      val convertValue = self.valueClass != valueWritableClass  
      logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +    valueWritableClass.getSimpleName + ")" )  
      val format = classOf[SequenceFileOutputFormat[Writable, Writable]]  
      val jobConf = new JobConf(self.context.hadoopConfiguration)  
      if (!convertKey && !convertValue) {    
        self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)  
      } else if (!convertKey && convertValue) {    
        self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(      
          path, keyWritableClass, valueWritableClass, format, jobConf, codec)  
      } else if (convertKey && !convertValue) {    
        self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(      
          path, keyWritableClass, valueWritableClass, format, jobConf, codec)  
      } else if (convertKey && convertValue) {    
        self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(      
          path, keyWritableClass, valueWritableClass, format, jobConf, codec)  
      }
    }
    

    从源码中可以看出,saveAsObjectFile函数是依赖于saveAsSequenceFile函数实现的,将RDD转化为类型为<NullWritable,BytesWritable>的PairRDD,然后通过saveAsSequenceFile函数实现。在spark的java版的api中没有实现saveAsSequenceFile函数,该函数类似于saveAsTextFile函数。

    实例:

    List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
    JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);
    javaRDD.saveAsObjectFile("/user/tmp");
    

    相关文章

      网友评论

        本文标题:【Spark Java API】Action(6)—saveAs

        本文链接:https://www.haomeiwen.com/subject/mvnwsttx.html