美文网首页
pyspark save_mongo

pyspark save_mongo

作者: EricLee_1900 | 来源:发表于2020-03-27 12:22 被阅读0次

    功能: 通过SQLContext提供的reader读取器读取外部数据源的数据,并形成DataFrame

    1.源码的主要方法

    format:给定数据源数据格式类型,eg: json、parquet

    schema:给定读入数据的数据schema,可以不给定,不给定的情况下,进行数据类型推断

    option:添加参数,这些参数在数据解析的时候可能会用到

    load:

    有参数的指从参数给定的path路径中加载数据,比如:JSON、Parquet...

    无参数的指直接加载数据(根据option相关的参数)

    jdbc:读取关系型数据库的数据

    json:读取json格式数据

    parquet:读取parquet格式数据

    orc: 读取orc格式数据

    table:直接读取关联的Hive数据库中的对应表数据

    val df=sqlContext.read.format("json").load("spark/sql/people.json")

    功能:将DataFrame的数据写出到外部数据源

    1.源码主要方法

    mode: 给定数据输出的模式

      `overwrite`: overwrite the existing data.

      `append`: append the data. 

      `ignore`: ignore the operation (i.e. no-op).

      `error`: default option, throw an exception at runtime.

    format:给定输出文件所属类型, eg: parquet、json

    option: 给定参数

    partitionBy:给定分区字段(要求输出的文件类型支持数据分区)

    save: 触发数据保存操作 --> 当该API被调用后,数据已经写出到具体的数据保存位置了

    jdbc:将数据输出到关系型数据库

      当mode为append的时候,数据追加方式是:

        先将表中的所有索引删除

        再追加数据

      没法实现,数据不存在就添加,存在就更新的需求

    读取Hive表数据形成DataFrame

    val df = sqlContext.read.table("common.emp")

    结果保存json格式

    df.select("empno","ename").write.mode("ignore").format("json").save("/beifeng/result/json")

    df.select("empno","ename").write.mode("error").format("json").save("/beifeng/result/json")

    df.select("empno","ename", "sal").write.mode("overwrite").format("json").save("/beifeng/result/json")

    df.select("empno","ename").write.mode("append").format("json").save("/beifeng/result/json")\

    上面虽然在追加的时候加上了sal,但是解析没有问题

    sqlContext.read.format("json").load("/beifeng/result/json").show()

    结果保存parquet格式

    df.select("empno", "ename", "deptno").write.format("parquet").save("/beifeng/result/parquet01")

    df.select("empno", "ename","sal", "deptno").write.mode("append").format("parquet").save("/beifeng/result/parquet01") ## 加上sal导致解析失败,读取数据的时候

    sqlContext.read.format("parquet").load("/beifeng/result/parquet01").show(100)

    sqlContext.read.format("parquet").load("/beifeng/result/parquet01/part*").show(100)

    partitionBy按照给定的字段进行分区

    df.select("empno", "ename", "deptno").write.format("parquet").partitionBy("deptno").save("/beifeng/result/parquet02")

    sqlContext.read.format("parquet").load("/beifeng/result/parquet02").show(100)

    相关文章

      网友评论

          本文标题:pyspark save_mongo

          本文链接:https://www.haomeiwen.com/subject/ydaoehtx.html