美文网首页
Spark分析fsimage

Spark分析fsimage

作者: 至垚 | 来源:发表于2021-02-23 19:20 被阅读0次

    """
    create table dev.bj5_hadoop_fsimage(
    path string,
    replication int,
    modificationtime date,
    accesstime date,
    preferredblocksize int,
    blockscount int,
    filesize int,
    nsquota int,
    dsquota int,
    permission string,
    username string,
    groupname string,
    path0 string,
    path1 string,
    path2 string,
    path3 string,
    path4 string
    )
    PARTITIONED BY (pt string)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat';
    """

    from pyspark import SparkContext,HiveContext
    import time
    from pyspark.sql import Row, SparkSession
    from pyspark.sql.functions import split,concat_ws

    def load_file_to_spark(x):
    if 'Replication' in x:
    pass
    else:
    return x
    if name == 'main':
    table_name = ''
    sc = SparkContext.getOrCreate()
    s_time=time.strftime("%Y-%m", time.localtime())
    spark = SparkSession.builder.enableHiveSupport().appName('pyspark').getOrCreate()
    sqlContext = HiveContext(sc)
    # lines = sc.textFile("/Users/pausky/Downloads/xml_file1")
    lines = sc.textFile("/tmp/xml_file")
    filt_lines = lines.filter(load_file_to_spark)
    parts = filt_lines.map(lambda x: x.split('[<'))
    hdfs_file_imager = parts.map(lambda p: Row(path=p[0], replication=p[1], modificationtime=p[2], accesstime=p[3],
    preferredblocksize=p[4],blockscount=p[5],filesize=p[6], nsquota=p[7], dsquota=p[8],permission=p[9],
    username=p[10],groupname=p[11]))
    df_fsimage = spark.createDataFrame(hdfs_file_imager)
    split_col = split(df_fsimage['path'], "/")
    df_fsimage_with_muti_c = df_fsimage.withColumn('path0', split_col.getItem(1))
    .withColumn('path1', split_col.getItem(2)).withColumn('path2', split_col.getItem(3))
    .withColumn('path3', split_col.getItem(4)).withColumn('path4', split_col.getItem(5))
    # df_fsimage_with_c = df_fsimage_with_muti_c.withColumn('same_c', concat_ws('/',df_fsimage_with_muti_c['path0'],
    # df_fsimage_with_muti_c['path1'],
    # df_fsimage_with_muti_c['path2'],
    # df_fsimage_with_muti_c['path3'],
    # df_fsimage_with_muti_c['path4']))
    # df_fsimage_delete_c = df_fsimage_with_c.drop('path0').drop('path1').drop('path2').drop('path3').drop('path4')
    # df_fsimage_delete_c.write.saveAsTable("dev.bj5_hadoop_fsimage", mode='overwrite')
    # df_fsimage_delete_c.write.orc("/user/hive/warehouse/dev.db/bj5_hadoop_fsimage/pt=2021-02/")
    # df_fsimage_delete_c.write.option("path","/user/hive/warehouse/dev.db/bj5_hadoop_fsimage/pt=2021-02/").saveAsTable('dev.bj5_hadoop_fsimage')
    df_fsimage_with_muti_c.createOrReplaceTempView("bj5_hdfs_fsimage_table")
    sqlContext.sql('insert overwrite table '+ table_name +' partition (pt = "'+s_time+'") select path,replication,modificationtime,accesstime,preferredblocksize,blockscount,'
    'filesize,nsquota,dsquota,permission,username,groupname,path0,path1,path2,path3,path4 from bj5_hdfs_fsimage_table')
    # t = spark.sql('select path,replication,modificationtime,accesstime,preferredblocksize,blockscount,filesize,nsquota,dsquota,permission,username,groupname,same_c from bj5_hdfs_fsimage_table ')
    # df_fsimage_with_c.write.parquet('')
    # t = spark.sql('select same_c,sum(FileSize)/1024/1024 as size from bj5_hdfs_fsimage_table where same_c="user/hive/123" group by same_c order by size ')
    # print(t.show(10))

    相关文章

      网友评论

          本文标题:Spark分析fsimage

          本文链接:https://www.haomeiwen.com/subject/utddfltx.html