官网
RDD转DF有两种方式,第一种是反射的方式,但是case class有很大的限制。所以用的较多的是编程方式,反射方式大家可以参考官网
编程方式分三步走:
- Create an RDD of Rows from the original RDD;
- Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
- Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
文本数据
10000,PK,0,100000,200000
10001,Jepson,0,99999,199999
10002,17er,1,2000,5
10003,laoer,2,2001,6
10004,laoliang,1,2002,7
代码
package com.soul.bigdata.spark.sql02
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
object TextRDDApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]").appName("TextRDDApp").getOrCreate()
val rowRDD = spark.sparkContext.textFile("file:///D:\\RZ-G6\\CustomText\\test01.txt").map(x => x.split(",").toSeq)
.map(attributes => {
Row(
attributes(0).trim(),
attributes(1).trim(),
attributes(2).trim(),
attributes(3).trim().toLong,
attributes(4).trim()
)
})
val schema = StructType(Array(StructField("id",StringType,false),
StructField("name",StringType,false),
StructField("gender",StringType,false),
StructField("salary",LongType,false),
StructField("comm",StringType,false)))
val df = spark.createDataFrame(rowRDD,schema)
df.show()
spark.stop()
}
}
![](https://img.haomeiwen.com/i9221434/60ed921a5986b626.png)
错误
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of bigint
上面错误时由于类型引起的,我在定义schema是将类型全部改为StringType就不会报错,如果salary你要定义为LongType,那么在定义rowRDD时salary需要toLong一下。
网友评论