1.操作Parquet文件数据
读取数据
val userDF = spark.read.format(“parquet”).load(“file:///root/spark/spark-2.0.2->bin-hadoop2.7/examples/src/main/resources/users.parquet”)
写数据
以json的形式写在一个新的路径下的文件下
源数据是parquet类型的文件,读取到之后的数据通过write方法,以json的形式重新存储到新的文件中.
命令代码如下:
userDF.select("name","favorite_color").write.format("json").save("file:///root/spark/tmp/jsonout")
操作hdfs中的parquet文件。
摘自:https://www.cnblogs.com/yszd/p/10107443.html
package code.parquet
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* Created by zhen on 2018/12/11.
*/
object ParquetIO {
// 指定hdfs根节点
private val hdfsRoot = "hdfs://172.20.32.163:8020"
// 获取HDFS路径
def getPath(path: String): Path = {
if (path.toLowerCase().startsWith("hdfs://")) {
new Path(path)
} else {
new Path(hdfsRoot + path)
}
}
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("parquet").master("local[2]").getOrCreate()
spark.sparkContext.setLogLevel("WARN") // 设置日志级别为WARN
val fsUri = new URI(hdfsRoot)
val fs = FileSystem.get(fsUri, new Configuration())
val path = hdfsRoot + "/YXFK/compute/KH_JLD"
val has = fs.exists(getPath(path))
if(has){
// 读取hdfs文件系统parquet数据
val dataFrame = spark.read.parquet(path)
dataFrame.show(10)
// 筛选,过滤数据
val result = dataFrame.select("JLDBH", "JLDDZ", "JLDMC", "JLFSDM", "CJSJ")
.filter("JLDDZ is not null AND JLFSDM = 3")
.sort("JLDBH")
result.show(10)
// 写入部分数据到本地
result.write.mode(SaveMode.Overwrite).parquet("E:\\result")
}
// 读取本地parquet数据
val localDataFrame = spark.read.parquet("E:\\jld.parquet")
localDataFrame.show(10)
// 读取写入数据验证
val resultSpace = spark.read.parquet("E:\\result")
resultSpace.show(10)
}
}
write.mode("overwrite") // 覆盖已经存在的文件
write.mode("error")或者默认(default) // 如果文件存在,则报错
write.mode("append") // 向存在的文件追加
write.mode("ignore") // 如果文件已存在,则忽略保存操作
读取Hive表数据形成DataFrame
val df = sqlContext.read.table("common.emp")
结果保存json格式
df.select("empno","ename").write.mode("ignore").format("json").save("/wangyao/result/json")
df.select("empno","ename").write.mode("error").format("json").save("/wangyao/result/json")
df.select("empno","ename", "sal").write.mode("overwrite").format("json").save("/wangyao/result/json")
df.select("empno","ename").write.mode("append").format("json").save("/wangyao/result/json")\
上面虽然在追加的时候加上了sal,但是解析没有问题
sqlContext.read.format("json").load("/wangyao/result/json").show()
结果保存parquet格式
df.select("empno", "ename", "deptno").write.format("parquet").save("/wangyao/result/parquet01")
df.select("empno", "ename","sal", "deptno").write.mode("append").format("parquet").save("/wangyao/result/parquet01") ## 加上sal导致解析失败,读取数据的时候
读取parquet格式
sqlContext.read.format("parquet").load("/wangyao/result/parquet01").show(100)
sqlContext.read.format("parquet").load("/wangyao/result/parquet01/part*").show(100)
partitionBy按照给定的字段进行分区
df.select("empno", "ename", "deptno").write.format("parquet").partitionBy("deptno").save("/wangyao/result/parquet02")
sqlContext.read.format("parquet").load("/wangyao/result/parquet02").show(100)
2.Spark下的Top K问题
《Spark 大数据处理》 by 高彦杰
整个排序取 TopK 的实现:
object TopK {
val K = 3
def main(args: Array[String]) {
// 执行 wordcount
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
val textRDD = sparkSession.sparkContext.textFile("/apps/recommend/models/wangyao/wordcount.txt")
textRDD.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.map { pair => pair.swap } //
.sortByKey(true, 2)
.top(3)
.foreach(println)
sparkSession.stop()
}
}
topK问题使用堆来作为数据结构最合适。
有空再补充一下。
Reference
《Spark 大数据处理》 -高彦杰
https://blog.csdn.net/yuzx2008/article/details/50732151
https://github.com/agile6v/scala-tutorials/blob/master/tutorial/7_11.md
网友评论