2020/07/07 -
引言
这部分的内容属于实际情况中使用的内容,这些内容可能很多时候都用不到的,所以这里我主要按照使用过程中用到了再来记录。
- 加载json文件
1. 加载json文件
读取json文件有两种,跟csv一样,一种是通过saprkSession来获取具体的DataFrame形式的数据;另一种是通过sc.textFile来获取行组织的文本数据。下面先来说第二种。
#spark是sparkSession实例
import josn
##加载数据
json_rdd = spark.sparkContext.textFile(file_path).map(lambda x: json.loads(x))
query_rdd = json_rdd.filter(lambda x: x['type'] == 'query').cache()
##保存数据
query_rdd.map(lambda x: json.dumps(x)).saveAsTextFile(file_path)
这种也是书上介绍的json处理方式,这种形式必须保证json文件是按行组织的,这样才能保证每行都是一个独立的json数据;然后在进行后续的操作中,对元素的操作时可以按照字典的形式来执行。
第一种方法就比较高级的了。他是利用底层spark.sql的东西来获取数据,还可以将数据的格式给推导出来;json的数据格式比较关键的内容就是这个层级的数据格式怎么办。使用如下方法可以获取带有格式的DataFrame,这种读取方式类似CSV一样,也是可以指定opinions的。
#spark是sparkSession实例
json_df = spark.read.json(file_path)
这里以suricata的event数据为例,读取之后执行命令df.printSchema()
直接调用顶层的键是没有意义的,就是null。一般来说如果要选择或者过滤df的数据,使用df.filter
或者df.select
的方式,这个key传入的方式也是按照层级关系来组织比如anomaly下code这个。
df.select(df.anomaly.code).show()
df.select(df['anomaly']['code']).show()
df.select('anomaly.code').show()
网友评论