美文网首页
pyspark save_mongo

pyspark save_mongo

作者: EricLee_1900 | 来源:发表于2020-03-27 12:22 被阅读0次

功能: 通过SQLContext提供的reader读取器读取外部数据源的数据,并形成DataFrame

1.源码的主要方法

format:给定数据源数据格式类型,eg: json、parquet

schema:给定读入数据的数据schema,可以不给定,不给定的情况下,进行数据类型推断

option:添加参数,这些参数在数据解析的时候可能会用到

load:

有参数的指从参数给定的path路径中加载数据,比如:JSON、Parquet...

无参数的指直接加载数据(根据option相关的参数)

jdbc:读取关系型数据库的数据

json:读取json格式数据

parquet:读取parquet格式数据

orc: 读取orc格式数据

table:直接读取关联的Hive数据库中的对应表数据

val df=sqlContext.read.format("json").load("spark/sql/people.json")

功能:将DataFrame的数据写出到外部数据源

1.源码主要方法

mode: 给定数据输出的模式

  `overwrite`: overwrite the existing data.

  `append`: append the data. 

  `ignore`: ignore the operation (i.e. no-op).

  `error`: default option, throw an exception at runtime.

format:给定输出文件所属类型, eg: parquet、json

option: 给定参数

partitionBy:给定分区字段(要求输出的文件类型支持数据分区)

save: 触发数据保存操作 --> 当该API被调用后,数据已经写出到具体的数据保存位置了

jdbc:将数据输出到关系型数据库

  当mode为append的时候,数据追加方式是:

    先将表中的所有索引删除

    再追加数据

  没法实现,数据不存在就添加,存在就更新的需求

读取Hive表数据形成DataFrame

val df = sqlContext.read.table("common.emp")

结果保存json格式

df.select("empno","ename").write.mode("ignore").format("json").save("/beifeng/result/json")

df.select("empno","ename").write.mode("error").format("json").save("/beifeng/result/json")

df.select("empno","ename", "sal").write.mode("overwrite").format("json").save("/beifeng/result/json")

df.select("empno","ename").write.mode("append").format("json").save("/beifeng/result/json")\

上面虽然在追加的时候加上了sal,但是解析没有问题

sqlContext.read.format("json").load("/beifeng/result/json").show()

结果保存parquet格式

df.select("empno", "ename", "deptno").write.format("parquet").save("/beifeng/result/parquet01")

df.select("empno", "ename","sal", "deptno").write.mode("append").format("parquet").save("/beifeng/result/parquet01") ## 加上sal导致解析失败,读取数据的时候

sqlContext.read.format("parquet").load("/beifeng/result/parquet01").show(100)

sqlContext.read.format("parquet").load("/beifeng/result/parquet01/part*").show(100)

partitionBy按照给定的字段进行分区

df.select("empno", "ename", "deptno").write.format("parquet").partitionBy("deptno").save("/beifeng/result/parquet02")

sqlContext.read.format("parquet").load("/beifeng/result/parquet02").show(100)

相关文章

网友评论

      本文标题:pyspark save_mongo

      本文链接:https://www.haomeiwen.com/subject/ydaoehtx.html