PySpark NoteBook-2

作者: 7125messi | 来源:发表于2018-01-11 18:00 被阅读159次

    Merge

    1.Stacking Rows with Matching Columns(行堆积)

    你可能在每个数据框中都有相同的列,只是想将其中一个堆叠在另一个之上。

    from pyspark.sql import Row
    
    row = Row("name", "pet", "count")
    
    df1 = sc.parallelize([
        row("Sue", "cat", 16),
        row("Kim", "dog", 1),    
        row("Bob", "fish", 5)
        ]).toDF()
    
    df2 = sc.parallelize([
        row("Fred", "cat", 2),
        row("Kate", "ant", 179),    
        row("Marc", "lizard", 5)
        ]).toDF()
    
    df3 = sc.parallelize([
        row("Sarah", "shark", 3),
        row("Jason", "kids", 2),    
        row("Scott", "squirrel", 1)
        ]).toDF()
    
    df_union = df1.unionAll(df2)
    df_union.show()
    +----+------+-----+
    |name|   pet|count|
    +----+------+-----+
    | Sue|   cat|   16|
    | Kim|   dog|    1|
    | Bob|  fish|    5|
    |Fred|   cat|    2|
    |Kate|   ant|  179|
    |Marc|lizard|    5|
    +----+------+-----+
    

    unionAll方法只允许我们一次堆栈两个数据帧。 如果有多个数据帧以这种方式堆叠,我们可以重复这样做,但是我们也可以使用一个辅助函数UDF来使它更容易。

    标准的Python命令reduce将一个函数应用到项目列表中,以便将其“减少”到一个输出。 有了这个,你可以将任意数量的数据框传递给我们的UDF函数,它们会堆积在一起:

    from pyspark.sql import DataFrame
    from functools import reduce
    
    def union_many(*dfs):    
        return reduce(DataFrame.unionAll, dfs)
    df_union = union_many(df1, df2, df3)
    df_union.show()
    +-----+--------+-----+
    | name|     pet|count|
    +-----+--------+-----+
    |  Sue|     cat|   16|
    |  Kim|     dog|    1|
    |  Bob|    fish|    5|
    | Fred|     cat|    2|
    | Kate|     ant|  179|
    | Marc|  lizard|    5|
    |Sarah|   shark|    3|
    |Jason|    kids|    2|
    |Scott|squirrel|    1|
    +-----+--------+-----+
    

    2.Merging Columns by Matching Rows

    合并的另一种方式是通过跨行的某些键组合列。
    在我们构建数据之后,有四种方法可以指定操作的逻辑:

    row1 = Row("name", "pet", "count")
    row2 = Row("name", "pet2", "count2")
    
    df1 = sc.parallelize([
        row1("Sue", "cat", 16),
        row1("Kim", "dog", 1),    
        row1("Bob", "fish", 5),
        row1("Libuse", "horse", 1)
        ]).toDF()
    
    df2 = sc.parallelize([
        row2("Sue", "eagle", 2),
        row2("Kim", "ant", 179),    
        row2("Bob", "lizard", 5),
        row2("Ferdinand", "bees", 23)
        ]).toDF()
    

    首先我们要做一个“内连接”(inner join),它将在两个数据框中都合并的行和在其他所有行中合并的行。 这是默认的连接类型,所以如果你不希望显式地指明(如果显式几乎总是更好的话),那么how参数可以省略。 我们将在name列中的条目进行合并,您可以看到这是方法中的第二个参数; 如果合并发生在多个匹配值上,这也可以是一个list:

    df1.join(df2, 'name', how='inner').show()
    +----+----+-----+------+------+
    |name| pet|count|  pet2|count2|
    +----+----+-----+------+------+
    | Sue| cat|   16| eagle|     2|
    | Bob|fish|    5|lizard|     5|
    | Kim| dog|    1|   ant|   179|
    +----+----+-----+------+------+
    df1.join(df2, 'name', how='outer').show()
    df1.join(df2, 'name', how='left').show()
    

    Missing-data

    1 探索Null值

    df = spark.read.csv('s3://ui-spark-social-science-public/data/Performance_2015Q1.txt', 
                        header=False, inferSchema=True, sep='|', nullValue='')
    

    请注意,在nullValue=''行中,空字符串可以被数据集使用的任何值替换 - 这是告诉Spark哪些值应该在数据框中转换为空值。

    df.count()
    3526154
    
    df.dtypes
    [('_c0', 'bigint'),
     ('_c1', 'string'),
     ('_c2', 'string'),
     ('_c3', 'double'),
     ('_c4', 'double'),
     ('_c5', 'int'),
     ('_c6', 'int'),
     ('_c7', 'int'),
     ('_c8', 'string'),
     ('_c9', 'int'),
     ('_c10', 'string'),
     ('_c11', 'string'),
     ('_c12', 'int'),
     ('_c13', 'string'),
     ('_c14', 'string'),
     ('_c15', 'string'),
     ('_c16', 'string'),
     ('_c17', 'string'),
     ('_c18', 'string'),
     ('_c19', 'string'),
     ('_c20', 'string'),
     ('_c21', 'string'),
     ('_c22', 'string'),
     ('_c23', 'string'),
     ('_c24', 'string'),
     ('_c25', 'string'),
     ('_c26', 'int'),
     ('_c27', 'string')]
    
    df.where( df['_c12'].isNull() ).count()
    3510294
    只能在列上调用isNull()方法,而不是在整个df上调用
    

    利用UDF函数循环每列超出每列的空值个数

    def count_nulls(df):
        null_counts = []          
        for col in df.dtypes:     
            cname = col[0]            
            ctype = col[1]        
            if ctype != 'string': 
                nulls = df.where( df[cname].isNull() ).count()
                result = tuple([cname, nulls])   
                null_counts.append(result)      
        return null_counts
    
    null_counts = count_nulls(df)
    null_counts
    [('_c0', 0),
     ('_c3', 0),
     ('_c4', 1945752),
     ('_c5', 0),
     ('_c6', 0),
     ('_c7', 1),
     ('_c9', 0),
     ('_c12', 3510294),
     ('_c26', 3526153)]
    

    2 删除空值

    现在我们可以对'null'值做三件事,即我们知道数据框中的内容。
    We can ignore them, we can drop them, or we can replace them。
    请记住,PySpark数据框是不可变的,所以我们实际上不能改变原始数据集。 所有的操作都会返回一个全新的数据框,尽管我们可以用df = df.some_operation()来覆盖现有的数据框。

    df_drops = df.dropna(how='all', subset=['_c4', '_c12', '_c26'])
    df_drops.count()
    

    df.dropna()方法在这里有两个参数:'how'可以等于'any'或者'all' 如果any的值为null。
    subset参数需要一个你想查找null值的列的列表。 它实际上并不是数据框的子集; 它只是检查这三列,然后删除整个数据帧的行,如果该子集满足条件。 如果它应该检查所有的列为null,这可以被关闭。
    “dropna()”可以使用第三个参数。 在“thresh”参数设置一个阈值之前,一个行中的“空”条目将其丢弃。 它被设置为一个整数,用于指定该行必须具有多少个非空参数。 如果它小于这个数字就会丢弃该行。**如果你像下面那样指定了这个参数,它将返回一个数据帧,其中指定子集中少于2个非空值的行被删除:

    所以我们可以在上面看到,一旦我们删除'_c4,_c12和_c26列中所有具有null`值的行,我们在原来的3,526,154行中就剩下1,580,403行 在整个数据帧上。

    df_drops2 = df.dropna(thresh=2, subset=['_c4', '_c12', '_c26'])
    df_drops2.count()
    

    3 替换空值

    df_fill = df.fillna(0, subset=['_c12'])
    df_fill.where( df_fill['_c12'].isNull() ).count()
    0
    

    我们看到它取代了之前发现的所有3,510,294个空值。 “fillna()”中的第一项可以是任何类型和任何值,如果填充应用于所有列,子集列表可以保持不变(尽管确保dtype与已经在该列中的内容一致)。 请注意,df.replace(a,b)做同样的事情,只有你指定a作为被替换的值,b作为替换。 它也接受可选的子集列表,但不利用优化的空值处理。

    相关文章

      网友评论

        本文标题:PySpark NoteBook-2

        本文链接:https://www.haomeiwen.com/subject/krltoxtx.html