一. 需求
今天有个需求,需要将pyspark里面的dataframe输入到单个txt文件中。
我们知道pyspark是分布式的,所以写文件的时候也是多线程操作,此时就会生成多个不同的文件。
二. 解决方案
2.1 dataframe写本地磁盘
查阅资料发现保存到本地文件系统(file:///)只有再local模式下才能生效,在cluster模式下(不论是yarn-client还是yarn-cluster)都无法使用
2.1.1 csv文件格式
/home/spark/dir1 这个目录由程序来创建,如果已存在会报错
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.master("local") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df1 = spark.sql("select * from test.emp")
df1.coalesce(1).write.format("csv").options(header='true', inferschema='true').save("file:///home/spark/dir1")
运行程序:
spark-submit --master local 1.py
测试记录:
image.png
2.1.1 text文件格式
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.master("local") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df1 = spark.sql("select * from test.emp")
df1.coalesce(1).write.format("text").options(header='true', inferschema='true').save("file:///home/spark/dir2")
运行报错:
pyspark.sql.utils.AnalysisException: u'Text data source does not support int data type.;'
需要调整代码,写text之前将dataframe保存为字符串类型
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.master("local") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df1 = spark.sql("select * from test.emp")
df2 = df1.select(concat_ws(',',*df1.columns).alias('data'))
df2.coalesce(1).write.format("text").options(header='true', inferschema='true').save("file:///home/spark/dir2")
image.png
2.1 dataframe写HDFS
以写csv格式为例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.master("local") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df1 = spark.sql("select * from test.emp")
df1.coalesce(1).write.format("csv").options(header='true', inferschema='true').save("hdfs://hp1:8020/user/juzhen/dir1")
测试记录:
image.png image.png
网友评论