一. 多源数据 -> dataframe
1、hive table
import org.apache.spark.sql.SparkSession
import spark.implicits._ //rdds dataframe的一些隐式转换
// 声明session
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val sqlDF = spark.sql("select * from tb_userprofile limit 10 ")
sqlDF.show()
2、url上的数据
import org.apache.commons.io.IOUtils
import java.net.URL
import java.nio.charset.Charset
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val peopleRdd = sc.parallelize(
IOUtils.toString(
new URL("https://raw.githubusercontent.com/apache/spark/master/examples/src/main/resources/people.csv"),
Charset.forName("utf8")).split("\n")).filter(s => !s.startsWith("name"))
val peopleDF = peopleRdd.toDF().withColumn("tmp",split($"value",";")).select(
$"tmp".getItem(0).as("name"),
$"tmp".getItem(1).as("age"),
$"tmp".getItem(2).as("job")
)
peopleDF.show()
3、json
import spark.implicits._
val peopleDF = spark.read.json("data.json")
4、csv
The steps of the csv reading:
- 通过case class定义列名及对应类型,需要与文件中列名一致
- 读取csv并转为dataframe
- 通过as转为dataset
case class People (name:String, aLge:String, job:String )
val people_ds = sqlContext
.read
.option("header", "true")
.option("delimiter", ",")
.option("inferSchema", "true")
.csv("people_data.csv")
.as[People]
5、tsv
val chatDF = spark.read.format("csv"). // Use "csv" regardless of TSV or CSV.
option("header","false"). // Does the file have a header line?
option("delimiter","\t"). // Set delimiter to tab or comma.
load(path)
网友评论