"""
create table dev.bj5_hadoop_fsimage(
path string,
replication int,
modificationtime date,
accesstime date,
preferredblocksize int,
blockscount int,
filesize int,
nsquota int,
dsquota int,
permission string,
username string,
groupname string,
path0 string,
path1 string,
path2 string,
path3 string,
path4 string
)
PARTITIONED BY (pt string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat';
"""
from pyspark import SparkContext,HiveContext
import time
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import split,concat_ws
def load_file_to_spark(x):
if 'Replication' in x:
pass
else:
return x
if name == 'main':
table_name = ''
sc = SparkContext.getOrCreate()
s_time=time.strftime("%Y-%m", time.localtime())
spark = SparkSession.builder.enableHiveSupport().appName('pyspark').getOrCreate()
sqlContext = HiveContext(sc)
# lines = sc.textFile("/Users/pausky/Downloads/xml_file1")
lines = sc.textFile("/tmp/xml_file")
filt_lines = lines.filter(load_file_to_spark)
parts = filt_lines.map(lambda x: x.split('[<'))
hdfs_file_imager = parts.map(lambda p: Row(path=p[0], replication=p[1], modificationtime=p[2], accesstime=p[3],
preferredblocksize=p[4],blockscount=p[5],filesize=p[6], nsquota=p[7], dsquota=p[8],permission=p[9],
username=p[10],groupname=p[11]))
df_fsimage = spark.createDataFrame(hdfs_file_imager)
split_col = split(df_fsimage['path'], "/")
df_fsimage_with_muti_c = df_fsimage.withColumn('path0', split_col.getItem(1))
.withColumn('path1', split_col.getItem(2)).withColumn('path2', split_col.getItem(3))
.withColumn('path3', split_col.getItem(4)).withColumn('path4', split_col.getItem(5))
# df_fsimage_with_c = df_fsimage_with_muti_c.withColumn('same_c', concat_ws('/',df_fsimage_with_muti_c['path0'],
# df_fsimage_with_muti_c['path1'],
# df_fsimage_with_muti_c['path2'],
# df_fsimage_with_muti_c['path3'],
# df_fsimage_with_muti_c['path4']))
# df_fsimage_delete_c = df_fsimage_with_c.drop('path0').drop('path1').drop('path2').drop('path3').drop('path4')
# df_fsimage_delete_c.write.saveAsTable("dev.bj5_hadoop_fsimage", mode='overwrite')
# df_fsimage_delete_c.write.orc("/user/hive/warehouse/dev.db/bj5_hadoop_fsimage/pt=2021-02/")
# df_fsimage_delete_c.write.option("path","/user/hive/warehouse/dev.db/bj5_hadoop_fsimage/pt=2021-02/").saveAsTable('dev.bj5_hadoop_fsimage')
df_fsimage_with_muti_c.createOrReplaceTempView("bj5_hdfs_fsimage_table")
sqlContext.sql('insert overwrite table '+ table_name +' partition (pt = "'+s_time+'") select path,replication,modificationtime,accesstime,preferredblocksize,blockscount,'
'filesize,nsquota,dsquota,permission,username,groupname,path0,path1,path2,path3,path4 from bj5_hdfs_fsimage_table')
# t = spark.sql('select path,replication,modificationtime,accesstime,preferredblocksize,blockscount,filesize,nsquota,dsquota,permission,username,groupname,same_c from bj5_hdfs_fsimage_table ')
# df_fsimage_with_c.write.parquet('')
# t = spark.sql('select same_c,sum(FileSize)/1024/1024 as size from bj5_hdfs_fsimage_table where same_c="user/hive/123" group by same_c order by size ')
# print(t.show(10))
网友评论