美文网首页
DataFrame的去重,none值填充及异常值处理2018-0

DataFrame的去重,none值填充及异常值处理2018-0

作者: AntFish | 来源:发表于2018-05-23 15:51 被阅读0次

    spark 数据建模准备

    去重

    #初始化spark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.master("local[*]").appName("shuangyu").getOrCreate()
    
    df = spark.createDataFrame([(1,144.5,5.9,33,'M'),
                               (2,167.2,5.4,45,'M'),
                               (3,124.1,5.2,23,'F'),
                               (4,144.5,5.9,33,'M'),
                               (5,133.2,5.7,54,'F'),
                               (3,124.1,5.2,23,'F'),
                               (5,129.2,5.3,42,'M')],["id","weight","height","age","gender"])
    #分别打印dataframe未去重和去重后的行数
    print("count of rows: {}".format(df.count()))
    print("count of distinct rows: {}".format(df.distinct().count()))
    

    count of rows: 7
    count of distinct rows: 6

    #去掉重复的行
    df = df.dropDuplicates()
    df.show()
    

    +---+------+------+---+------+
    | id|weight|height|age|gender|
    +---+------+------+---+------+
    | 5| 133.2| 5.7| 54| F|
    | 5| 129.2| 5.3| 42| M|
    | 1| 144.5| 5.9| 33| M|
    | 4| 144.5| 5.9| 33| M|
    | 2| 167.2| 5.4| 45| M|
    | 3| 124.1| 5.2| 23| F|
    +---+------+------+---+------+

    #计算排除id后是否有重复的数据
    print("counts of ids: {}".format(df.count()))
    print("counts of distinct ids: {}".format(df.select([c for c in df.columns if c != "id"]).distinct().count()))
    

    counts of ids: 6
    counts of distinct ids: 5

    #发现有2行出去ID外其它都是重复的,现在要去掉其中的一行
    df = df.dropDuplicates(subset = [c for c in df.columns if c != "id"])
    df.show()
    

    +---+------+------+---+------+
    | id|weight|height|age|gender|
    +---+------+------+---+------+
    | 5| 133.2| 5.7| 54| F|
    | 1| 144.5| 5.9| 33| M|
    | 2| 167.2| 5.4| 45| M|
    | 3| 124.1| 5.2| 23| F|
    | 5| 129.2| 5.3| 42| M|
    +---+------+------+---+------+

    #ok.现在来计算下是否有重复的ID
    import pyspark.sql.functions as fn #导入spark sql的一些函数
    
    df.agg(fn.count("id").alias("count"),
           fn.countDistinct("id").alias("distinct")).show()
    

    +-----+--------+
    |count|distinct|
    +-----+--------+
    | 5| 4|
    +-----+--------+

    #发现有重复的ID,我们可能需要重新给每行数据分分配唯一的新的ID来标示它们
    df.withColumn("newId",fn.monotonically_increasing_id()).show()
    #withColums 新增一列
    #monotonically_increasing_id 生成唯一自增ID
    

    +---+------+------+---+------+-------------+
    | id|weight|height|age|gender| newId|
    +---+------+------+---+------+-------------+
    | 5| 133.2| 5.7| 54| F| 25769803776|
    | 1| 144.5| 5.9| 33| M| 171798691840|
    | 2| 167.2| 5.4| 45| M| 592705486848|
    | 3| 124.1| 5.2| 23| F|1236950581248|
    | 5| 129.2| 5.3| 42| M|1365799600128|
    +---+------+------+---+------+-------------+

    数据缺失

    df_miss = spark.createDataFrame([(1,143.5,5.6,28,'M',10000),
                                    (2,167.2,5.4,45,'M',None),
                                    (3,None,5.2,None,None,None),
                                    (4,144.5,5.9,33,'M',None),
                                    (5,133.2,5.7,54,'F',None),
                                    (6,124.1,5.2,None,'F',None),
                                    (7,129.2,5.3,42,'M',76000)],
                                   ['id','weight','height','age','gender','income'])
    #统计每一行缺失的数据量
    df_miss.rdd.map(lambda row: (row['id'],sum([c == None for c in row]))).collect()
    

    [(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

    #第三行数据缺失有点多,来看一下第三行数据
    df_miss.where('id == 3').show()
    

    +---+------+------+----+------+------+
    | id|weight|height| age|gender|income|
    +---+------+------+----+------+------+
    | 3| null| 5.2|null| null| null|
    +---+------+------+----+------+------+

    #统计每列数据缺失情况
    df_miss.agg(*[(1-(fn.count(c)/fn.count('*'))).alias(c + "_miss") for c in df_miss.columns]).show()
    

    +-------+------------------+-----------+------------------+------------------+------------------+
    |id_miss| weight_miss|height_miss| age_miss| gender_miss| income_miss|
    +-------+------------------+-----------+------------------+------------------+------------------+
    | 0.0|0.1428571428571429| 0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
    +-------+------------------+-----------+------------------+------------------+------------------+

    #income列缺失太多,基本无用了,现在要去掉这一列数据
    df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != "income"])
    df_miss_no_income.show()
    

    +---+------+------+----+------+
    | id|weight|height| age|gender|
    +---+------+------+----+------+
    | 1| 143.5| 5.6| 28| M|
    | 2| 167.2| 5.4| 45| M|
    | 3| null| 5.2|null| null|
    | 4| 144.5| 5.9| 33| M|
    | 5| 133.2| 5.7| 54| F|
    | 6| 124.1| 5.2|null| F|
    | 7| 129.2| 5.3| 42| M|
    +---+------+------+----+------+

    #某些行缺失的数据也比较多,现在去除掉这些行
    #thresh=3 表示一行中非NONE的数据少于3个则去除该行
    df_miss_no_income.dropna(thresh=3).show()
    

    +---+------+------+----+------+
    | id|weight|height| age|gender|
    +---+------+------+----+------+
    | 1| 143.5| 5.6| 28| M|
    | 2| 167.2| 5.4| 45| M|
    | 4| 144.5| 5.9| 33| M|
    | 5| 133.2| 5.7| 54| F|
    | 6| 124.1| 5.2|null| F|
    | 7| 129.2| 5.3| 42| M|
    +---+------+------+----+------+

    #只要含有NONE则去除该行
    df_miss_no_income.dropna().show()
    

    +---+------+------+---+------+
    | id|weight|height|age|gender|
    +---+------+------+---+------+
    | 1| 143.5| 5.6| 28| M|
    | 2| 167.2| 5.4| 45| M|
    | 4| 144.5| 5.9| 33| M|
    | 5| 133.2| 5.7| 54| F|
    | 7| 129.2| 5.3| 42| M|
    +---+------+------+---+------+

    #为none值填充新值
    means = df_miss_no_income.agg(*[fn.mean(c).alias(c) 
                                    for c in df_miss_no_income.columns if c != 'gender'])\
                                    .toPandas().to_dict('records')[0]
    means['gender'] = "missing"
    print(means)
    #df.fillna(dict) 填充df中的none值,dict中以各个col字段作为key,要填充的值作为value 
    df_miss_no_income.fillna(means).show()
    

    {'age': 40.4, 'height': 5.471428571428571, 'gender': 'missing', 'weight': 140.28333333333333, 'id': 4.0}
    +---+------------------+------+---+-------+
    | id| weight|height|age| gender|
    +---+------------------+------+---+-------+
    | 1| 143.5| 5.6| 28| M|
    | 2| 167.2| 5.4| 45| M|
    | 3|140.28333333333333| 5.2| 40|missing|
    | 4| 144.5| 5.9| 33| M|
    | 5| 133.2| 5.7| 54| F|
    | 6| 124.1| 5.2| 40| F|
    | 7| 129.2| 5.3| 42| M|
    +---+------------------+------+---+-------+

    异常值

    df_outliers = spark.createDataFrame([(1,143.5,5.3,28),
                                        (2,154.2,5.5,45),
                                        (3,342.3,5.1,99),
                                        (4,144.5,5.5,33),
                                        (5,133.2,5.4,54),
                                        (6,124.1,5.1,21),
                                        (7,129.2,5.3,42)],["id","weight","height","age"])
    cols = ["weight","height","age"]
    #bounds,用来存储后面生成的各个字段值的边界
    bounds = {}
    for col in cols:
        #涉及统计中的4分位。计算Q1和Q3
        quantiles = df_outliers.approxQuantile(col, [0.25,0.75], 0.05)
        #计算4分位距
        IQR = quantiles[1] - quantiles[0]
        #计算内限
        bounds[col] = [quantiles[0] - 1.5*IQR, quantiles[1] + 1.5*IQR]
        
    print("bounds: ",bounds)
    #判断是否为异常值,在内限之外的值为异常值
    outliers = df_outliers.select(*['id'] + \
                                  [((df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1]) )\
                                   .alias(c +"_o") for c in cols])
    outliers.show()
    

    bounds: {'age': [-11.0, 93.0], 'height': [4.499999999999999, 6.1000000000000005], 'weight': [91.69999999999999, 191.7]}
    +---+--------+--------+-----+
    | id|weight_o|height_o|age_o|
    +---+--------+--------+-----+
    | 1| false| false|false|
    | 2| false| false|false|
    | 3| true| false| true|
    | 4| false| false|false|
    | 5| false| false|false|
    | 6| false| false|false|
    | 7| false| false|false|
    +---+--------+--------+-----+

    #查询出异常值
    df_outliers = df_outliers.join(outliers,on = 'id')
    #上面的join语句不要写成 df_outliers.join(outliers, df_outliers.id == outliers.id) 否则在
    #新生成的 df_outliers中会有2列id,后面在select时会报错AnalysisException: "Reference 'id' is ambiguous
    df_outliers.show()
    

    +---+------+------+---+--------+--------+-----+
    | id|weight|height|age|weight_o|height_o|age_o|
    +---+------+------+---+--------+--------+-----+
    | 7| 129.2| 5.3| 42| false| false|false|
    | 6| 124.1| 5.1| 21| false| false|false|
    | 5| 133.2| 5.4| 54| false| false|false|
    | 1| 143.5| 5.3| 28| false| false|false|
    | 3| 342.3| 5.1| 99| true| false| true|
    | 2| 154.2| 5.5| 45| false| false|false|
    | 4| 144.5| 5.5| 33| false| false|false|
    +---+------+------+---+--------+--------+-----+

    df_outliers.filter('weight_o').select('id','weight').show()
    

    +---+------+
    | id|weight|
    +---+------+
    | 3| 342.3|
    +---+------+

    df_outliers.filter("age_o").select("id","age").show()
    

    +---+---+
    | id|age|
    +---+---+
    | 3| 99|
    +---+---+

    相关文章

      网友评论

          本文标题:DataFrame的去重,none值填充及异常值处理2018-0

          本文链接:https://www.haomeiwen.com/subject/qbirjftx.html