1)反射 :case class 前提 事先需要知道你的字段、字段类型
2)编程 : Row 事先不知道列
优先选择反射 ,下面放两个实例
package bl.test.spark
import org.apache.spark.sql.types.{StructField, StructType,StringType,IntegerType}
import org.apache.spark.sql.{Row, SparkSession}
object DataFrameRDDApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("DataFrameRDDApp")
.master("local[2]").getOrCreate()
inferReflection(spark)
program(spark)
spark.stop()
}
def program(spark: SparkSession): Unit ={
val rdd = spark.sparkContext.textFile("file:////home/zy/Desktop/success.txt")
val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))
val structType = StructType(Array(
StructField("id",IntegerType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)
));
val infoDF = spark.createDataFrame(infoRDD,structType);
infoDF.printSchema()
infoDF.show()
infoDF.filter((infoDF.col(("age")) > 30)).show()
//创建一张表 使用基于sql的api
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
def inferReflection(spark: SparkSession) = {
//RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("file:////home/zy/Desktop/success.txt")
//导入隐式转换
import spark.implicits._
//按照逗号切割
val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()
infoDF.show()
infoDF.filter((infoDF.col(("age")) > 30)).show()
//创建一张表 使用基于sql的api
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
case class Info(id:Int, name:String, age:Int)
}
网友评论