美文网首页
scala 读取外部数据源以及topK问题

scala 读取外部数据源以及topK问题

作者: wong11 | 来源:发表于2019-03-28 11:07 被阅读0次

1.操作Parquet文件数据

读取数据

val userDF = spark.read.format(“parquet”).load(“file:///root/spark/spark-2.0.2->bin-hadoop2.7/examples/src/main/resources/users.parquet”)

写数据

以json的形式写在一个新的路径下的文件下
源数据是parquet类型的文件,读取到之后的数据通过write方法,以json的形式重新存储到新的文件中.
命令代码如下:

userDF.select("name","favorite_color").write.format("json").save("file:///root/spark/tmp/jsonout")

操作hdfs中的parquet文件。
摘自:https://www.cnblogs.com/yszd/p/10107443.html

package code.parquet

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
  * Created by zhen on 2018/12/11.
  */
object ParquetIO {
  // 指定hdfs根节点
  private val hdfsRoot = "hdfs://172.20.32.163:8020"
  // 获取HDFS路径
  def getPath(path: String): Path = {
    if (path.toLowerCase().startsWith("hdfs://")) {
      new Path(path)
    } else {
      new Path(hdfsRoot + path)
    }
  }
  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("parquet").master("local[2]").getOrCreate()
    spark.sparkContext.setLogLevel("WARN") // 设置日志级别为WARN
    val fsUri = new URI(hdfsRoot)
    val fs = FileSystem.get(fsUri, new Configuration())
    val path = hdfsRoot + "/YXFK/compute/KH_JLD"
    val has = fs.exists(getPath(path))
    if(has){
      // 读取hdfs文件系统parquet数据
      val dataFrame = spark.read.parquet(path)
      dataFrame.show(10)
      // 筛选,过滤数据
      val result = dataFrame.select("JLDBH", "JLDDZ", "JLDMC", "JLFSDM", "CJSJ")
        .filter("JLDDZ is not null AND JLFSDM = 3")
        .sort("JLDBH")
      result.show(10)
      // 写入部分数据到本地
      result.write.mode(SaveMode.Overwrite).parquet("E:\\result")
    }
    // 读取本地parquet数据
    val localDataFrame = spark.read.parquet("E:\\jld.parquet")
    localDataFrame.show(10)
    // 读取写入数据验证
    val resultSpace = spark.read.parquet("E:\\result")
    resultSpace.show(10)
  }
}
write.mode("overwrite")                                                    // 覆盖已经存在的文件
write.mode("error")或者默认(default)                                        // 如果文件存在,则报错
write.mode("append")                                                       // 向存在的文件追加
write.mode("ignore")                                                       // 如果文件已存在,则忽略保存操作
读取Hive表数据形成DataFrame
val df = sqlContext.read.table("common.emp")

结果保存json格式
df.select("empno","ename").write.mode("ignore").format("json").save("/wangyao/result/json")
df.select("empno","ename").write.mode("error").format("json").save("/wangyao/result/json")
df.select("empno","ename", "sal").write.mode("overwrite").format("json").save("/wangyao/result/json")
df.select("empno","ename").write.mode("append").format("json").save("/wangyao/result/json")\
上面虽然在追加的时候加上了sal,但是解析没有问题
sqlContext.read.format("json").load("/wangyao/result/json").show()

结果保存parquet格式
df.select("empno", "ename", "deptno").write.format("parquet").save("/wangyao/result/parquet01")
df.select("empno", "ename","sal", "deptno").write.mode("append").format("parquet").save("/wangyao/result/parquet01") ## 加上sal导致解析失败,读取数据的时候
读取parquet格式
sqlContext.read.format("parquet").load("/wangyao/result/parquet01").show(100)
sqlContext.read.format("parquet").load("/wangyao/result/parquet01/part*").show(100)

partitionBy按照给定的字段进行分区
df.select("empno", "ename", "deptno").write.format("parquet").partitionBy("deptno").save("/wangyao/result/parquet02")
sqlContext.read.format("parquet").load("/wangyao/result/parquet02").show(100)

2.Spark下的Top K问题

《Spark 大数据处理》 by 高彦杰

整个排序取 TopK 的实现:

object TopK {

  val K = 3

  def main(args: Array[String]) {
    // 执行 wordcount
    val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
    val textRDD = sparkSession.sparkContext.textFile("/apps/recommend/models/wangyao/wordcount.txt")
    textRDD.flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
      .map { pair => pair.swap }     //
      .sortByKey(true, 2)
      .top(3)
      .foreach(println)

    sparkSession.stop()

  }
}

topK问题使用堆来作为数据结构最合适。
有空再补充一下。

Reference

《Spark 大数据处理》 -高彦杰
https://blog.csdn.net/yuzx2008/article/details/50732151
https://github.com/agile6v/scala-tutorials/blob/master/tutorial/7_11.md

相关文章

网友评论

      本文标题:scala 读取外部数据源以及topK问题

      本文链接:https://www.haomeiwen.com/subject/ycfogftx.html