美文网首页pyspark学习
6 基于PySpark大规模数据预处理

6 基于PySpark大规模数据预处理

作者: 7125messi | 来源:发表于2018-01-09 22:03 被阅读1925次

    有了前面的基础,我们就可以利用PySpark进行数据的分析与挖掘,而通常我们需要花费80%——90%的时间用于数据的预处理,这是我们后续建模开发的基础,本文将基于PySpark对大规模数据预处理从实际业务场景出发进行分析,从而更接地气!!!

    1 重复数据

    spark 2.0+
    df = spark.createDataFrame([
            (1, 144.5, 5.9, 33, 'M'),
            (2, 167.2, 5.4, 45, 'M'),
            (3, 124.1, 5.2, 23, 'F'),
            (4, 144.5, 5.9, 33, 'M'),
            (5, 133.2, 5.7, 54, 'F'),
            (3, 124.1, 5.2, 23, 'F'),
            (5, 129.2, 5.3, 42, 'M')], 
                               ['id', 
                                'weight', 
                                'height', 
                                'age', 
                                'gender']
                              )
    
    spark 1.6.2
    df = sqlContext.createDataFrame([
            (1, 144.5, 5.9, 33, 'M'),
            (2, 167.2, 5.4, 45, 'M'),
            (3, 124.1, 5.2, 23, 'F'),
            (4, 144.5, 5.9, 33, 'M'),
            (5, 133.2, 5.7, 54, 'F'),
            (3, 124.1, 5.2, 23, 'F'),
            (5, 129.2, 5.3, 42, 'M')], 
                               ['id', 
                                'weight', 
                                'height', 
                                'age', 
                                'gender']
                              )
    

    1.1 检查重复数据

    用 .distinct() 完全相同的两行或多行去重

    print('Count of rows: {0}'.format(df.count()))
    print('Count of distinct rows: {0}'.format(df.distinct().count()))
    
    Count of rows: 7
    Count of distinct rows: 6
    

    用 .dropDuplicates(...) 方法去除重复行,具体用法可以参考本文集 Pyspark.SQL模块介绍

    df = df.dropDuplicates()
    df.show()
    +---+------+------+---+------+
    | id|weight|height|age|gender|
    +---+------+------+---+------+
    |  5| 129.2|   5.3| 42|     M|
    |  4| 144.5|   5.9| 33|     M|
    |  3| 124.1|   5.2| 23|     F|
    |  2| 167.2|   5.4| 45|     M|
    |  1| 144.5|   5.9| 33|     M|
    |  5| 133.2|   5.7| 54|     F|
    +---+------+------+---+------+
    

    除了ID字段外的消除重复行

    print('Count of ids: {0}'.format(df.count()))
    
    #print('Count of distinct ids: {0}'.format(df.select([c for c in df.columns if c != 'id']).distinct().count()))
    print('Count of distinct ids:{0}'.format(df.select(["weight","height","age","gender"]).distinct().count()))
    
    Count of ids: 6
    Count of distinct ids:5
    

    使用 .dropDuplicates(...) 处理重复行,但是添加 subset 参数

    # df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id'])
    df =df.dropDuplicates(["weight","height","age","gender"])
    df.show()
    +---+------+------+---+------+
    | id|weight|height|age|gender|
    +---+------+------+---+------+
    |  5| 133.2|   5.7| 54|     F|
    |  4| 144.5|   5.9| 33|     M|
    |  2| 167.2|   5.4| 45|     M|
    |  5| 129.2|   5.3| 42|     M|
    |  3| 124.1|   5.2| 23|     F|
    +---+------+------+---+------+
    

    利用.agg(...)函数计算ID的总数和ID唯一个数

    import pyspark.sql.functions as fn
    
    df.agg(
        fn.count('id').alias('id_count'),
        fn.countDistinct('id').alias('id_distinct')
    ).show()
    +--------+-----------+
    |id_count|id_distinct|
    +--------+-----------+
    |       5|          4|
    +--------+-----------+
    

    给每一行一个唯一的ID号,相当于新增一列数字

    df.withColumn('new_id', fn.monotonically_increasing_id()).show()
    +---+------+------+---+------+------------+
    | id|weight|height|age|gender|      new_id|
    +---+------+------+---+------+------------+
    |  5| 133.2|   5.7| 54|     F|283467841536|
    |  4| 144.5|   5.9| 33|     M|352187318272|
    |  2| 167.2|   5.4| 45|     M|386547056640|
    |  5| 129.2|   5.3| 42|     M|438086664192|
    |  3| 124.1|   5.2| 23|     F|463856467968|
    +---+------+------+---+------+------------+
    

    1.2 缺失值观察

    df_miss = sqlContext.createDataFrame([
            (1, 143.5, 5.6, 28,   'M',  100000),
            (2, 167.2, 5.4, 45,   'M',  None),
            (3, None , 5.2, None, None, None),
            (4, 144.5, 5.9, 33,   'M',  None),
            (5, 133.2, 5.7, 54,   'F',  None),
            (6, 124.1, 5.2, None, 'F',  None),
            (7, 129.2, 5.3, 42,   'M',  76000),
        ], ['id', 'weight', 'height', 'age', 'gender', 'income'])
    df_miss.show()
    +---+------+------+----+------+------+
    | id|weight|height| age|gender|income|
    +---+------+------+----+------+------+
    |  1| 143.5|   5.6|  28|     M|100000|
    |  2| 167.2|   5.4|  45|     M|  null|
    |  3|  null|   5.2|null|  null|  null|
    |  4| 144.5|   5.9|  33|     M|  null|
    |  5| 133.2|   5.7|  54|     F|  null|
    |  6| 124.1|   5.2|null|     F|  null|
    |  7| 129.2|   5.3|  42|     M| 76000|
    +---+------+------+----+------+------+
    

    统计每行缺失值的个数

    df_miss.rdd.\
    map(lambda row:(sum([c == None for c in row]),row['id'])).\
    sortByKey(ascending=False).collect()    #(缺失值个数,行号)
    [(4, 3), (2, 6), (1, 2), (1, 4), (1, 5), (0, 1), (0, 7)]
    
    df_miss.where('id = 3').show()
    +---+------+------+----+------+------+
    | id|weight|height| age|gender|income|
    +---+------+------+----+------+------+
    |  3|  null|   5.2|null|  null|  null|
    +---+------+------+----+------+------+
    

    每一列缺失的观测数据的百分比

    # fn.count(c) / fn.count('*')  为每列字段非缺失的记录数占比
    
    # fn.count('*')  *表示列名的位置,指示该列方法计算所有的列
    # .agg(*   *之前的列指示,.agg(...)方法将该列表处理为一组独立的参数传递给函数
    df_miss.agg(*[
        (1.00 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df_miss.columns
    ]).show()
    
    +----------+------------------+--------------+------------------+------------------+------------------+
    |id_missing|    weight_missing|height_missing|       age_missing|    gender_missing|    income_missing|
    +----------+------------------+--------------+------------------+------------------+------------------+
    |       0.0|0.1428571428571429|           0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
    +----------+------------------+--------------+------------------+------------------+------------------+
    
    

    移除income列,因为income列大部分是缺失值,缺失值占比达到71%

    df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != 'income'])
    df_miss_no_income.show()
    +---+------+------+----+------+
    | id|weight|height| age|gender|
    +---+------+------+----+------+
    |  1| 143.5|   5.6|  28|     M|
    |  2| 167.2|   5.4|  45|     M|
    |  3|  null|   5.2|null|  null|
    |  4| 144.5|   5.9|  33|     M|
    |  5| 133.2|   5.7|  54|     F|
    |  6| 124.1|   5.2|null|     F|
    |  7| 129.2|   5.3|  42|     M|
    +---+------+------+----+------+
    

    使用.dropna(...) 移除方法,thresh(设定移除数的阈值)

    df_miss_no_income.dropna(thresh=3).show()
    +---+------+------+----+------+
    | id|weight|height| age|gender|
    +---+------+------+----+------+
    |  1| 143.5|   5.6|  28|     M|
    |  2| 167.2|   5.4|  45|     M|
    |  4| 144.5|   5.9|  33|     M|
    |  5| 133.2|   5.7|  54|     F|
    |  6| 124.1|   5.2|null|     F|
    |  7| 129.2|   5.3|  42|     M|
    +---+------+------+----+------+
    

    使用.fillna(...) 方法,填充观测数据.

    means = df_miss_no_income.agg(*[
            fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict('records')[0]
    
    # gender单独处理
    means['gender'] = 'missing'
    
    df_miss_no_income.fillna(means).show()
    
    +---+-------------+------+---+-------+
    | id|       weight|height|age| gender|
    +---+-------------+------+---+-------+
    |  1|        143.5|   5.6| 28|      M|
    |  2|        167.2|   5.4| 45|      M|
    |  3|140.283333333|   5.2| 40|missing|
    |  4|        144.5|   5.9| 33|      M|
    |  5|        133.2|   5.7| 54|      F|
    |  6|        124.1|   5.2| 40|      F|
    |  7|        129.2|   5.3| 42|      M|
    +---+-------------+------+---+-------+
    

    1.3 离群值Outliers(异常数据)

    df_outliers = sqlContext.createDataFrame([
            (1, 143.5, 5.3, 28),
            (2, 154.2, 5.5, 45),
            (3, 342.3, 5.1, 99),
            (4, 144.5, 5.5, 33),
            (5, 133.2, 5.4, 54),
            (6, 124.1, 5.1, 21),
            (7, 129.2, 5.3, 42),
        ], ['id', 'weight', 'height', 'age'])
    df_outliers.show()
    +---+------+------+---+
    | id|weight|height|age|
    +---+------+------+---+
    |  1| 143.5|   5.3| 28|
    |  2| 154.2|   5.5| 45|
    |  3| 342.3|   5.1| 99|
    |  4| 144.5|   5.5| 33|
    |  5| 133.2|   5.4| 54|
    |  6| 124.1|   5.1| 21|
    |  7| 129.2|   5.3| 42|
    +---+------+------+---+
    

    使用之前列出的定义来标记离群值

    # approxQuantile(col, probabilities, relativeError) New in version 2.0.
    cols = ['weight', 'height', 'age']
    bounds = {}
    
    for col in cols:
        quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05)
        
        IQR = quantiles[1] - quantiles[0]
        
        bounds[col] = [
            quantiles[0] - 1.5 * IQR, 
            quantiles[1] + 1.5 * IQR
        ]
    

    bounds字典保存了每个特征的上下界限

    bounds
    {'age': [9.0, 51.0],
     'height': [4.8999999995, 5.6],
     'weight': [115.0, 146.8499999997]}
    

    标记样本数据的离群值

    # 按位或运算符:只要对应的两个二进位有一个为1时,结果位就为1。
    outliers = df_outliers.select(*['id'] + [((df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1])).alias(c + '_o') \
                                             for c in cols \
                                            ]\
                                 )
    outliers.show()
    +---+--------+--------+-----+
    | id|weight_o|height_o|age_o|
    +---+--------+--------+-----+
    |  1|   false|   false|false|
    |  2|    true|   false|false|
    |  3|    true|   false| true|
    |  4|   false|   false|false|
    |  5|   false|   false| true|
    |  6|   false|   false|false|
    |  7|   false|   false|false|
    +---+--------+--------+-----+
    

    列出weight和age与其他分部明显不同的部分(离群值)

    df_outliers = df_outliers.join(outliers, on='id')
    
    df_outliers.filter('weight_o').select('id', 'weight').show()
    df_outliers.filter('age_o').select('id', 'age').show()
    +---+------+
    | id|weight|
    +---+------+
    |  3| 342.3|
    |  2| 154.2|
    +---+------+
    
    +---+---+
    | id|age|
    +---+---+
    |  5| 54|
    |  3| 99|
    +---+---+
    

    通过上述方法即可获得快速清洗数据(重复值、缺失值和离群值)

    2 熟悉理解你的数据

    2.1描述性统计——EDA

    加载数据并且转换为Spark DatFrame

    # pyspark.sql.types显示了所有的我们可以使用的数据类型,IntegerType()和FloatType()
    from pyspark.sql import types
    fraud = sc.textFile('file:///root/ydzhao/PySpark/Chapter04/ccFraud.csv.gz')  # 信用卡欺诈数据集
    header = fraud.first()
    header
    '"custID","gender","state","cardholder","balance","numTrans","numIntlTrans","creditLine","fraudRisk"'
    
    fraud.take(3)
    ['"custID","gender","state","cardholder","balance","numTrans","numIntlTrans","creditLine","fraudRisk"',
     '1,1,35,1,3000,4,14,2,0',
     '2,2,2,1,0,9,0,18,0']
    
    fraud.count()
    10000001
    
    # (1) 去除标题行数据,每个元素转换成整型Integer,还是RDD
    fraud = fraud.filter(lambda row: row != header).map(lambda row: [int(x) for x in row.split(',')])
    fraud.take(3)
    [[1, 1, 35, 1, 3000, 4, 14, 2, 0],
     [2, 2, 2, 1, 0, 9, 0, 18, 0],
     [3, 2, 2, 1, 0, 27, 9, 16, 0]]
    
    # (2) 创建DataFrame模式
    # h[1:-1]代表第一行到最后一行
    schema = [
        *[
            types.StructField(h[1:-1], types.IntegerType(), True) for h in header.split(',')
        ]
    ]
    
    schema = types.StructType(schema)
    
    # (3) 创建DataFrame
    # spark2.0+
    # fraud_df = spark.createDataFrame(fraud, schema)
    
    # spark1.6.2
    fraud_df = sqlContext.createDataFrame(fraud, schema)
    fraud_df.printSchema()
    root
     |-- custID: integer (nullable = true)
     |-- gender: integer (nullable = true)
     |-- state: integer (nullable = true)
     |-- cardholder: integer (nullable = true)
     |-- balance: integer (nullable = true)
     |-- numTrans: integer (nullable = true)
     |-- numIntlTrans: integer (nullable = true)
     |-- creditLine: integer (nullable = true)
     |-- fraudRisk: integer (nullable = true)
    
    fraud_df.show()
    +------+------+-----+----------+-------+--------+------------+----------+---------+
    |custID|gender|state|cardholder|balance|numTrans|numIntlTrans|creditLine|fraudRisk|
    +------+------+-----+----------+-------+--------+------------+----------+---------+
    |     1|     1|   35|         1|   3000|       4|          14|         2|        0|
    |     2|     2|    2|         1|      0|       9|           0|        18|        0|
    |     3|     2|    2|         1|      0|      27|           9|        16|        0|
    |     4|     1|   15|         1|      0|      12|           0|         5|        0|
    |     5|     1|   46|         1|      0|      11|          16|         7|        0|
    |     6|     2|   44|         2|   5546|      21|           0|        13|        0|
    |     7|     1|    3|         1|   2000|      41|           0|         1|        0|
    |     8|     1|   10|         1|   6016|      20|           3|         6|        0|
    |     9|     2|   32|         1|   2428|       4|          10|        22|        0|
    |    10|     1|   23|         1|      0|      18|          56|         5|        0|
    |    11|     1|   46|         1|   4601|      54|           0|         4|        0|
    |    12|     1|   10|         1|   3000|      20|           0|         2|        0|
    |    13|     1|    6|         1|      0|      45|           2|         4|        0|
    |    14|     2|   38|         1|   9000|      41|           3|         8|        0|
    |    15|     1|   27|         1|   5227|      60|           0|        17|        0|
    |    16|     1|   44|         1|      0|      22|           0|         5|        0|
    |    17|     2|   18|         1|  13970|      20|           0|        13|        0|
    |    18|     1|   35|         1|   3113|      13|           6|         8|        0|
    |    19|     1|    5|         1|   9000|      20|           2|         8|        0|
    |    20|     2|   31|         1|   1860|      21|          10|         8|        0|
    +------+------+-----+----------+-------+--------+------------+----------+---------+
    only showing top 20 rows
    

    使用.groupby(...) 方法计算性别列的使用频率 我们所面对的样本一个性别比例失衡的样本.

    fraud_df.groupby('gender').count().show()
    +------+-------+
    |gender|  count|
    +------+-------+
    |     1|6178231|
    |     2|3821769|
    +------+-------+
    

    使用describe()方法进行数值统计

    desc = fraud_df.describe(['balance', 'numTrans', 'numIntlTrans'])
    desc.show()
    +-------+-----------------+------------------+-----------------+
    |summary|          balance|          numTrans|     numIntlTrans|
    +-------+-----------------+------------------+-----------------+
    |  count|         10000000|          10000000|         10000000|
    |   mean|     4109.9199193|        28.9351871|        4.0471899|
    | stddev|3996.847309737077|26.553781024522852|8.602970115863767|
    |    min|                0|                 0|                0|
    |    max|            41485|               100|               60|
    +-------+-----------------+------------------+-----------------+
    

    检查balance偏度(skewness)

    fraud_df.agg({'balance': 'skewness'}).show()
    +------------------+
    | skewness(balance)|
    +------------------+
    |1.1818315552996432|
    +------------------+
    

    2.2 相关性

    目前 .corr()方法仅仅支持Pearson相关性系数,且两两相关性。
    当然你可以把Spark DataFrame 转换为Python的DataFrame之后,你就可以随便怎么弄了,但是开销可能会很大,由数据量决定。期待后续版本的更新。

    fraud_df.corr('balance', 'numTrans')
    

    创建相关系数矩阵

    numerical = ['balance', 'numTrans', 'numIntlTrans']
    n_numerical = len(numerical)
    
    corr = []
    
    for i in range(0, n_numerical):
        temp = [None] * i
        for j in range(i, n_numerical):
            temp.append(fraud_df.corr(numerical[i], numerical[j]))
        corr.append(temp)
        
    corr
    [[1.0, 0.00044523140172659576, 0.00027139913398184604],
     [None, 1.0, -0.0002805712819816179],
     [None, None, 1.0]]
    

    2.3 数据可视化

    加载matplotlib 和 bokeh包,只会调用python的解释器

    %matplotlib inline
    import matplotlib.pyplot as plt
    plt.style.use('ggplot')
    
    import bokeh.charts as chrt
    from bokeh.io import output_notebook
    output_notebook()
    
    Loading BokehJS ... 
    

    2.3.1 直方图Histograms

    方法一 聚集工作节点中的数据并返回一个汇总bins列表和直方图每个bin中的计数给驱动(适用大数据集)

    (1) 先对数据进行聚合

    hists = fraud_df.select('balance').rdd.flatMap(lambda row: row).histogram(20)
    type(hists)
    tuple
    

    (2)
    使用matplotlib绘制直方图

    data = {
        'bins': hists[0][:-1],
        'freq': hists[1]
    }
    fig = plt.figure(figsize=(12,9))
    ax = fig.add_subplot(1, 1, 1)
    ax.bar(data['bins'], data['freq'], width=2000)
    ax.set_title('Histogram of balance')
    plt.savefig('balance.png', dpi=300)
    
    image.png

    使用Bokeh绘制直方图

    b_hist = chrt.Bar(data, values='freq', label='bins', title='Histogram of balance')
    chrt.show(b_hist)
    

    方法二 所有数据给驱动程序(适用于小数据集) 当然方法一是性能更好的

    data_driver = {'obs': fraud_df.select('balance').rdd.flatMap(lambda row: row).collect()}
    fig = plt.figure(figsize=(12,9))
    ax = fig.add_subplot(1, 1, 1)
    
    ax.hist(data_driver['obs'], bins=20)
    ax.set_title('Histogram of balance using .hist()')
    
    plt.savefig('balance_hist.png', dpi=300)
    
    image.png

    使用Bokeh绘制直方图

    b_hist_driver = chrt.Histogram(data_driver, values='obs', title='Histogram of balance using .Histogram()', bins=20)
    chrt.show(b_hist_driver)
    

    2.3.2 特征值之间的交互

    10000000条数据先抽样0.02%

    data_sample = fraud_df.sampleBy('gender', {1: 0.0002, 2: 0.0002}).select(numerical)
    data_sample.show()
    +-------+--------+------------+
    |balance|numTrans|numIntlTrans|
    +-------+--------+------------+
    |      0|      15|           0|
    |   4000|       3|           0|
    |   6000|      21|          50|
    |      0|      28|          10|
    |   8630|     100|           0|
    |   4000|      17|           0|
    |   5948|      76|           0|
    |   3000|       9|           4|
    |      0|       5|           0|
    |   1588|      55|           0|
    |   3882|      87|           0|
    |   1756|      12|           0|
    |   4000|       4|           0|
    |      0|      10|           0|
    |   5000|      17|           0|
    |   4188|      50|           0|
    |   3141|       2|           1|
    |   8000|      52|           5|
    |   9000|      40|           0|
    |   2423|      11|           1|
    +-------+--------+------------+
    only showing top 20 rows
    
    
    data_multi = dict([
        (x, data_sample.select(x).rdd.flatMap(lambda row: row).collect()) 
        for x in numerical
    ])
    
    sctr = chrt.Scatter(data_multi, x='balance', y='numTrans')
    
    chrt.show(sctr)
    

    相关文章

      网友评论

        本文标题:6 基于PySpark大规模数据预处理

        本文链接:https://www.haomeiwen.com/subject/cbtnnxtx.html