本文主要是参考在pyspark中操作hdfs文件, 并修改了一些代码中的bug
利用pyspark输出主要用的是 SaveAsTextFile
,但是这个函数为了实现输出的并行化会输出很多文件,如果想输出单个文件 / 自定义输出文件,应该怎么操作呢?
首先在pyspark中没有直接的接口函数对hdfs文件进行操作,这里使用py4j在python代码中运行java,实现hadoopConfiguration()
的调用
from py4j.java_gateway import JavaGateway
def path(sc, filepath):
"""
创建hadoop path对象
:param sc sparkContext对象
:param filename 文件绝对路径
:return org.apache.hadoop.fs.Path对象
"""
path_class = sc._gateway.jvm.org.apache.hadoop.fs.Path
return path_class(filepath)
def get_file_system(sc):
"""
创建FileSystem
:param sc SparkContext
:return FileSystem对象
"""
filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
hadoop_configuration = sc._jsc.hadoopConfiguration()
return filesystem_class.get(hadoop_configuration)
def write(sc, filepath, overwrite=True):
"""
写内容到hdfs文件
:param sc SparkContext
:param filepath 绝对路径
:param content 文件内容
:param overwrite 是否覆盖
"""
filesystem = get_file_system(sc)
out = filesystem.create(path(sc, filepath), overwrite)
return out
使用的方法就是
out = write(sc, output) # 获得输出流
out.write(bytes(【str】, "utf-8"))
out.flush()
out.close()
网友评论