美文网首页
数据支撑【对数据】

数据支撑【对数据】

作者: Coder小咚 | 来源:发表于2022-02-23 10:55 被阅读0次

    前言

    最近项目架构做了大的调整,很多数据表都要重新生产,新数据上线需要与旧数据做下校验,领导让我做个简单的工具来对比数据。

    校验逻辑

    首先定义表的对比指标,然后计算出两表的指标值,然后进行对比。指标如下:

    • 总数:count
    • 均值:mean
    • 标准差:stddev
    • 最大值:max
    • 最小值:min
    • 上四分位数:25%
    • 中位数:50%
    • 下四分位数:75%

    处理代码

    # coding=utf-8
    import sys
    import pandas as pd
    from pyspark.sql import SparkSession
    
    pd.options.display.max_columns = None
    pd.options.display.max_rows = None
    
    from texttable import Texttable
    
    
    # from pyspark import SparkConf, SparkContext, HiveContext
    
    
    def run_task(source_table, target_table):
        """
        对比指标:
            1、总条数 count
            2、去重后的条数 distinct_count
            3、最大值 max
            4、最小值 min
            5、总数 sum
    
        :param source_table:
        :param target_table:
        :return:
        """
        task_name = "data-verification-%s-%s" % (source_table, target_table)
        spark = SparkSession.builder.appName(task_name).enableHiveSupport().getOrCreate()
    
        # conf = SparkConf().set('spark.storage.blockManagerHeartBeatMs', 300000) \
        #     .set('spark.shuffle.io.serverThreads', 8) \
        #     .set('spark.shuffle.io.clientThreads', 8) \
        #     .set('spark.storage.blockManagerSlaveTimeoutMs', 300000).setAppName(task_name)
        # sc = SparkContext(conf=conf)
        # hiveContext = HiveContext(sc)
    
        # hiveContext.sql("use %s" % database)
        s_df = spark.sql("select * from %s" % source_table)
        t_df = spark.sql("select * from %s" % target_table)
    
        ss_df = s_df.summary()
        st_df = t_df.summary()
    
        ss_pdf = ss_df.toPandas().set_index("summary").T
    
        print("原始表统计结果:")
        ss_pdf.index.name = 'summary'
        ss_pdf['summary'] = ss_pdf.index
        ss_pdf = ss_pdf[['summary', 'count', 'mean', 'stddev', 'min', 'max', '25%', '50%', '75%']]
        ss_tb = Texttable()
        ss_tb.set_cols_align(['l', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r'])
        ss_tb.set_cols_dtype(['t', 't', 't', 't', 't', 't', 't', 't', 't'])
        ss_tb.set_cols_width([15, 10, 15, 15, 15, 15, 15, 15, 15])
        ss_tb.header(ss_pdf.columns.values)
        ss_tb.add_rows(ss_pdf.values, header=False)
        print(ss_tb.draw())
        print("\n")
    
        st_pdf = st_df.toPandas().set_index("summary").T
        print("新表统计结果:")
        st_pdf.index.name = 'summary'
        st_pdf['summary'] = ss_pdf.index
        st_pdf = ss_pdf[['summary', 'count', 'mean', 'stddev', 'min', 'max', '25%', '50%', '75%']]
        st_tb = Texttable()
        st_tb.set_cols_align(['l', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r'])
        st_tb.set_cols_dtype(['t', 't', 't', 't', 't', 't', 't', 't', 't'])
        st_tb.set_cols_width([15, 10, 15, 15, 15, 15, 15, 15, 15])
        st_tb.header(st_pdf.columns.values)
        st_tb.add_rows(st_pdf.values, header=False)
        print(st_tb.draw())
        print("\n")
    
        spark.stop()
    
    
    if __name__ == "__main__":
        source_table = sys.argv[1]
        target_table = sys.argv[2]
    
        run_task(source_table, target_table)
    

    启动脚本

    #!/bin/bash
    
    export LANG="en_US.UTF-8"
    export PYTHONIOENCODING=utf8
    
    source=$1
    target=$2
    
    if [ ! -n "$source" ]; then
      echo pls input source_table
      exit 1
    else
      echo source : $source
    fi
    
    if [ ! -n "$target" ]; then
      echo pls input target_table
      exit 1
    else
      echo target : $target
    fi
    
    spark-submit \
        --master yarn \
        --queue mid \
        --num-executors 10 \
        --driver-memory 5g \
        --executor-memory 15g \
        --executor-cores 2 \
        --py-files ./data-verification.zip \
        scripts/data_diff.py $source $target
    

    输出结果

    原始表统计结果:
    +-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    |     summary     |   count    |      mean       |     stddev      |       min       |       max       |       25%       |       50%       |       75%       |
    +=================+============+=================+=================+=================+=================+=================+=================+=================+
    | id              |          2 |             1.5 | 0.7071067811865 |               1 |               2 |               1 |               1 |               2 |
    |                 |            |                 |             476 |                 |                 |                 |                 |                 |
    +-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    | name            |          2 |            None |            None |           david |           david |            None |            None |            None |
    +-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    | salary          |          2 |         15000.0 | 7071.0678118654 |           10000 |           20000 |           10000 |           10000 |           20000 |
    |                 |            |                 |              75 |                 |                 |                 |                 |                 |
    +-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    | desc            |          2 |            None |            None |      一月份工资  |      二月份工资  |            None |            None |            None |
    +-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    
    
    新表统计结果:
    +-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    |     summary     |   count    |      mean       |     stddev      |       min       |       max       |       25%       |       50%       |       75%       |
    +=================+============+=================+=================+=================+=================+=================+=================+=================+
    | id              |          2 |             1.5 | 0.7071067811865 |               1 |               2 |               1 |               1 |               2 |
    |                 |            |                 |             476 |                 |                 |                 |                 |                 |
    +-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    | name            |          2 |            None |            None |           david |           david |            None |            None |            None |
    +-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    | salary          |          2 |         15000.0 | 7071.0678118654 |           10000 |           20000 |           10000 |           10000 |           20000 |
    |                 |            |                 |              75 |                 |                 |                 |                 |                 |
    +-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    | desc            |          2 |            None |            None |      一月份工资  |      二月份工资  |            None |            None |            None |
    +-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
    

    本次案例采用pyspark开发,需要注意python的运行环境,笔者采用的是anaconda3,需要安装的依赖有pandas、texttable,直接在yarn client机器上安装。

    相关文章

      网友评论

          本文标题:数据支撑【对数据】

          本文链接:https://www.haomeiwen.com/subject/gbbalrtx.html