美文网首页
pyspark案例系列4-dataframe输出到单个文件夹的解

pyspark案例系列4-dataframe输出到单个文件夹的解

作者: 只是甲 | 来源:发表于2022-06-10 17:30 被阅读0次

    一. 需求

    今天有个需求,需要将pyspark里面的dataframe输入到单个txt文件中。

    我们知道pyspark是分布式的,所以写文件的时候也是多线程操作,此时就会生成多个不同的文件。

    二. 解决方案

    2.1 dataframe写本地磁盘

    查阅资料发现保存到本地文件系统(file:///)只有再local模式下才能生效,在cluster模式下(不论是yarn-client还是yarn-cluster)都无法使用

    2.1.1 csv文件格式

    /home/spark/dir1 这个目录由程序来创建,如果已存在会报错

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .master("local") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    
    df1 = spark.sql("select * from test.emp")
    
    df1.coalesce(1).write.format("csv").options(header='true', inferschema='true').save("file:///home/spark/dir1")
    

    运行程序:
    spark-submit --master local 1.py

    测试记录:


    image.png

    2.1.1 text文件格式

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .master("local") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    
    df1 = spark.sql("select * from test.emp")
    
    df1.coalesce(1).write.format("text").options(header='true', inferschema='true').save("file:///home/spark/dir2")
    

    运行报错:
    pyspark.sql.utils.AnalysisException: u'Text data source does not support int data type.;'

    需要调整代码,写text之前将dataframe保存为字符串类型

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import concat_ws
    
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .master("local") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    
    df1 = spark.sql("select * from test.emp")
    
    df2 = df1.select(concat_ws(',',*df1.columns).alias('data'))
    
    df2.coalesce(1).write.format("text").options(header='true', inferschema='true').save("file:///home/spark/dir2")
    
    image.png

    2.1 dataframe写HDFS

    以写csv格式为例

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .master("local") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    
    df1 = spark.sql("select * from test.emp")
    
    df1.coalesce(1).write.format("csv").options(header='true', inferschema='true').save("hdfs://hp1:8020/user/juzhen/dir1")
    

    测试记录:


    image.png image.png

    参考:

    1. https://blog.csdn.net/jingyi130705008/article/details/108236217

    相关文章

      网友评论

          本文标题:pyspark案例系列4-dataframe输出到单个文件夹的解

          本文链接:https://www.haomeiwen.com/subject/wfndlltx.html