前言
最近项目架构做了大的调整,很多数据表都要重新生产,新数据上线需要与旧数据做下校验,领导让我做个简单的工具来对比数据。
校验逻辑
首先定义表的对比指标,然后计算出两表的指标值,然后进行对比。指标如下:
- 总数:count
- 均值:mean
- 标准差:stddev
- 最大值:max
- 最小值:min
- 上四分位数:25%
- 中位数:50%
- 下四分位数:75%
处理代码
# coding=utf-8
import sys
import pandas as pd
from pyspark.sql import SparkSession
pd.options.display.max_columns = None
pd.options.display.max_rows = None
from texttable import Texttable
# from pyspark import SparkConf, SparkContext, HiveContext
def run_task(source_table, target_table):
"""
对比指标:
1、总条数 count
2、去重后的条数 distinct_count
3、最大值 max
4、最小值 min
5、总数 sum
:param source_table:
:param target_table:
:return:
"""
task_name = "data-verification-%s-%s" % (source_table, target_table)
spark = SparkSession.builder.appName(task_name).enableHiveSupport().getOrCreate()
# conf = SparkConf().set('spark.storage.blockManagerHeartBeatMs', 300000) \
# .set('spark.shuffle.io.serverThreads', 8) \
# .set('spark.shuffle.io.clientThreads', 8) \
# .set('spark.storage.blockManagerSlaveTimeoutMs', 300000).setAppName(task_name)
# sc = SparkContext(conf=conf)
# hiveContext = HiveContext(sc)
# hiveContext.sql("use %s" % database)
s_df = spark.sql("select * from %s" % source_table)
t_df = spark.sql("select * from %s" % target_table)
ss_df = s_df.summary()
st_df = t_df.summary()
ss_pdf = ss_df.toPandas().set_index("summary").T
print("原始表统计结果:")
ss_pdf.index.name = 'summary'
ss_pdf['summary'] = ss_pdf.index
ss_pdf = ss_pdf[['summary', 'count', 'mean', 'stddev', 'min', 'max', '25%', '50%', '75%']]
ss_tb = Texttable()
ss_tb.set_cols_align(['l', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r'])
ss_tb.set_cols_dtype(['t', 't', 't', 't', 't', 't', 't', 't', 't'])
ss_tb.set_cols_width([15, 10, 15, 15, 15, 15, 15, 15, 15])
ss_tb.header(ss_pdf.columns.values)
ss_tb.add_rows(ss_pdf.values, header=False)
print(ss_tb.draw())
print("\n")
st_pdf = st_df.toPandas().set_index("summary").T
print("新表统计结果:")
st_pdf.index.name = 'summary'
st_pdf['summary'] = ss_pdf.index
st_pdf = ss_pdf[['summary', 'count', 'mean', 'stddev', 'min', 'max', '25%', '50%', '75%']]
st_tb = Texttable()
st_tb.set_cols_align(['l', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r'])
st_tb.set_cols_dtype(['t', 't', 't', 't', 't', 't', 't', 't', 't'])
st_tb.set_cols_width([15, 10, 15, 15, 15, 15, 15, 15, 15])
st_tb.header(st_pdf.columns.values)
st_tb.add_rows(st_pdf.values, header=False)
print(st_tb.draw())
print("\n")
spark.stop()
if __name__ == "__main__":
source_table = sys.argv[1]
target_table = sys.argv[2]
run_task(source_table, target_table)
启动脚本
#!/bin/bash
export LANG="en_US.UTF-8"
export PYTHONIOENCODING=utf8
source=$1
target=$2
if [ ! -n "$source" ]; then
echo pls input source_table
exit 1
else
echo source : $source
fi
if [ ! -n "$target" ]; then
echo pls input target_table
exit 1
else
echo target : $target
fi
spark-submit \
--master yarn \
--queue mid \
--num-executors 10 \
--driver-memory 5g \
--executor-memory 15g \
--executor-cores 2 \
--py-files ./data-verification.zip \
scripts/data_diff.py $source $target
输出结果
原始表统计结果:
+-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
| summary | count | mean | stddev | min | max | 25% | 50% | 75% |
+=================+============+=================+=================+=================+=================+=================+=================+=================+
| id | 2 | 1.5 | 0.7071067811865 | 1 | 2 | 1 | 1 | 2 |
| | | | 476 | | | | | |
+-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
| name | 2 | None | None | david | david | None | None | None |
+-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
| salary | 2 | 15000.0 | 7071.0678118654 | 10000 | 20000 | 10000 | 10000 | 20000 |
| | | | 75 | | | | | |
+-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
| desc | 2 | None | None | 一月份工资 | 二月份工资 | None | None | None |
+-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
新表统计结果:
+-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
| summary | count | mean | stddev | min | max | 25% | 50% | 75% |
+=================+============+=================+=================+=================+=================+=================+=================+=================+
| id | 2 | 1.5 | 0.7071067811865 | 1 | 2 | 1 | 1 | 2 |
| | | | 476 | | | | | |
+-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
| name | 2 | None | None | david | david | None | None | None |
+-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
| salary | 2 | 15000.0 | 7071.0678118654 | 10000 | 20000 | 10000 | 10000 | 20000 |
| | | | 75 | | | | | |
+-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
| desc | 2 | None | None | 一月份工资 | 二月份工资 | None | None | None |
+-----------------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
结
本次案例采用pyspark开发,需要注意python的运行环境,笔者采用的是anaconda3,需要安装的依赖有pandas、texttable,直接在yarn client机器上安装。
网友评论