功能: 通过SQLContext提供的reader读取器读取外部数据源的数据,并形成DataFrame
1.源码的主要方法
format:给定数据源数据格式类型,eg: json、parquet
schema:给定读入数据的数据schema,可以不给定,不给定的情况下,进行数据类型推断
option:添加参数,这些参数在数据解析的时候可能会用到
load:
有参数的指从参数给定的path路径中加载数据,比如:JSON、Parquet...
无参数的指直接加载数据(根据option相关的参数)
jdbc:读取关系型数据库的数据
json:读取json格式数据
parquet:读取parquet格式数据
orc: 读取orc格式数据
table:直接读取关联的Hive数据库中的对应表数据
val df=sqlContext.read.format("json").load("spark/sql/people.json")
功能:将DataFrame的数据写出到外部数据源
1.源码主要方法
mode: 给定数据输出的模式
`overwrite`: overwrite the existing data.
`append`: append the data.
`ignore`: ignore the operation (i.e. no-op).
`error`: default option, throw an exception at runtime.
format:给定输出文件所属类型, eg: parquet、json
option: 给定参数
partitionBy:给定分区字段(要求输出的文件类型支持数据分区)
save: 触发数据保存操作 --> 当该API被调用后,数据已经写出到具体的数据保存位置了
jdbc:将数据输出到关系型数据库
当mode为append的时候,数据追加方式是:
先将表中的所有索引删除
再追加数据
没法实现,数据不存在就添加,存在就更新的需求
读取Hive表数据形成DataFrame
val df = sqlContext.read.table("common.emp")
结果保存json格式
df.select("empno","ename").write.mode("ignore").format("json").save("/beifeng/result/json")
df.select("empno","ename").write.mode("error").format("json").save("/beifeng/result/json")
df.select("empno","ename", "sal").write.mode("overwrite").format("json").save("/beifeng/result/json")
df.select("empno","ename").write.mode("append").format("json").save("/beifeng/result/json")\
上面虽然在追加的时候加上了sal,但是解析没有问题
sqlContext.read.format("json").load("/beifeng/result/json").show()
结果保存parquet格式
df.select("empno", "ename", "deptno").write.format("parquet").save("/beifeng/result/parquet01")
df.select("empno", "ename","sal", "deptno").write.mode("append").format("parquet").save("/beifeng/result/parquet01") ## 加上sal导致解析失败,读取数据的时候
sqlContext.read.format("parquet").load("/beifeng/result/parquet01").show(100)
sqlContext.read.format("parquet").load("/beifeng/result/parquet01/part*").show(100)
partitionBy按照给定的字段进行分区
df.select("empno", "ename", "deptno").write.format("parquet").partitionBy("deptno").save("/beifeng/result/parquet02")
sqlContext.read.format("parquet").load("/beifeng/result/parquet02").show(100)
网友评论