- spark RDD 去括号+输出为单个CSV
rating 是 Dataframe,转为RDD
val avgs = rating.rdd
.map( t=> (t(0),t(1)).toString().replaceAll("\\(","").replaceAll("\\)",""))
.collect()
输出:
printToFile(new File("Output/task1.csv")) {
p => avgs.foreach(p.println) // 但是没有header
}
- 直接对Dataframe 输出,会产生一个文件夹,下面有CSV 和 _SUCCESS
//Create a folder
val saveOptions = Map("header" -> "true", "path" -> "Output/Firstname_Li_task1.csv")
rating.coalesce(1)
.write.mode(SaveMode.Overwrite).format("csv")
.options(saveOptions)
.save()
rating.write.option("header", "true").csv("Output/Firstname_Li_task1.csv")
rating.repartition(1)
.write.mode(SaveMode.Overwrite).format("com.databricks.spark.csv")
.option("header", "true")
.save("Output/Firstname_Li_task2.csv")
rating.toJavaRDD
.coalesce(1)
.saveAsTextFile("Firstname_Li_task1.csv") //Create a folder
- 输出单个CSV,且有header
import java.io._
def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit)
{
val p = new java.io.PrintWriter(f);
p.write("asin,")
p.write("rating_avg\n")
try { op(p) }
finally { p.close() }
}
val avgs = rating.rdd
.map( t=> (t(0),t(1)).toString().replaceAll("\\(","").replaceAll("\\)",""))
.collect()
printToFile(new File("Output/Firstname_Li_task1.csv")) {
p => avgs.foreach(p.println) // avgs.foreach(p.println)
}
网友评论