美文网首页Spark学习
《Learning Spark》第五章:加载和保存文件

《Learning Spark》第五章:加载和保存文件

作者: VChao | 来源:发表于2020-07-07 18:55 被阅读0次

    2020/07/07 -

    引言

    这部分的内容属于实际情况中使用的内容,这些内容可能很多时候都用不到的,所以这里我主要按照使用过程中用到了再来记录。

    1. 加载json文件

    1. 加载json文件

    读取json文件有两种,跟csv一样,一种是通过saprkSession来获取具体的DataFrame形式的数据;另一种是通过sc.textFile来获取行组织的文本数据。下面先来说第二种。

    #spark是sparkSession实例
    import josn
    ##加载数据
    json_rdd = spark.sparkContext.textFile(file_path).map(lambda x: json.loads(x))
    query_rdd = json_rdd.filter(lambda x: x['type'] == 'query').cache()
    
    ##保存数据
    query_rdd.map(lambda x: json.dumps(x)).saveAsTextFile(file_path)
    

    这种也是书上介绍的json处理方式,这种形式必须保证json文件是按行组织的,这样才能保证每行都是一个独立的json数据;然后在进行后续的操作中,对元素的操作时可以按照字典的形式来执行。

    第一种方法就比较高级的了。他是利用底层spark.sql的东西来获取数据,还可以将数据的格式给推导出来;json的数据格式比较关键的内容就是这个层级的数据格式怎么办。使用如下方法可以获取带有格式的DataFrame,这种读取方式类似CSV一样,也是可以指定opinions的。

    #spark是sparkSession实例
    json_df = spark.read.json(file_path)
    

    这里以suricata的event数据为例,读取之后执行命令df.printSchema()

    层级关系

    直接调用顶层的键是没有意义的,就是null。一般来说如果要选择或者过滤df的数据,使用df.filter或者df.select的方式,这个key传入的方式也是按照层级关系来组织比如anomaly下code这个。

    df.select(df.anomaly.code).show()
    df.select(df['anomaly']['code']).show()
    df.select('anomaly.code').show()
    

    相关文章

      网友评论

        本文标题:《Learning Spark》第五章:加载和保存文件

        本文链接:https://www.haomeiwen.com/subject/euuzqktx.html