PySpark NoteBook-2

作者: 7125messi | 来源:发表于2018-01-11 18:00 被阅读159次

Merge

1.Stacking Rows with Matching Columns(行堆积)

你可能在每个数据框中都有相同的列,只是想将其中一个堆叠在另一个之上。

from pyspark.sql import Row

row = Row("name", "pet", "count")

df1 = sc.parallelize([
    row("Sue", "cat", 16),
    row("Kim", "dog", 1),    
    row("Bob", "fish", 5)
    ]).toDF()

df2 = sc.parallelize([
    row("Fred", "cat", 2),
    row("Kate", "ant", 179),    
    row("Marc", "lizard", 5)
    ]).toDF()

df3 = sc.parallelize([
    row("Sarah", "shark", 3),
    row("Jason", "kids", 2),    
    row("Scott", "squirrel", 1)
    ]).toDF()
df_union = df1.unionAll(df2)
df_union.show()
+----+------+-----+
|name|   pet|count|
+----+------+-----+
| Sue|   cat|   16|
| Kim|   dog|    1|
| Bob|  fish|    5|
|Fred|   cat|    2|
|Kate|   ant|  179|
|Marc|lizard|    5|
+----+------+-----+

unionAll方法只允许我们一次堆栈两个数据帧。 如果有多个数据帧以这种方式堆叠,我们可以重复这样做,但是我们也可以使用一个辅助函数UDF来使它更容易。

标准的Python命令reduce将一个函数应用到项目列表中,以便将其“减少”到一个输出。 有了这个,你可以将任意数量的数据框传递给我们的UDF函数,它们会堆积在一起:

from pyspark.sql import DataFrame
from functools import reduce

def union_many(*dfs):    
    return reduce(DataFrame.unionAll, dfs)
df_union = union_many(df1, df2, df3)
df_union.show()
+-----+--------+-----+
| name|     pet|count|
+-----+--------+-----+
|  Sue|     cat|   16|
|  Kim|     dog|    1|
|  Bob|    fish|    5|
| Fred|     cat|    2|
| Kate|     ant|  179|
| Marc|  lizard|    5|
|Sarah|   shark|    3|
|Jason|    kids|    2|
|Scott|squirrel|    1|
+-----+--------+-----+

2.Merging Columns by Matching Rows

合并的另一种方式是通过跨行的某些键组合列。
在我们构建数据之后,有四种方法可以指定操作的逻辑:

row1 = Row("name", "pet", "count")
row2 = Row("name", "pet2", "count2")

df1 = sc.parallelize([
    row1("Sue", "cat", 16),
    row1("Kim", "dog", 1),    
    row1("Bob", "fish", 5),
    row1("Libuse", "horse", 1)
    ]).toDF()

df2 = sc.parallelize([
    row2("Sue", "eagle", 2),
    row2("Kim", "ant", 179),    
    row2("Bob", "lizard", 5),
    row2("Ferdinand", "bees", 23)
    ]).toDF()

首先我们要做一个“内连接”(inner join),它将在两个数据框中都合并的行和在其他所有行中合并的行。 这是默认的连接类型,所以如果你不希望显式地指明(如果显式几乎总是更好的话),那么how参数可以省略。 我们将在name列中的条目进行合并,您可以看到这是方法中的第二个参数; 如果合并发生在多个匹配值上,这也可以是一个list:

df1.join(df2, 'name', how='inner').show()
+----+----+-----+------+------+
|name| pet|count|  pet2|count2|
+----+----+-----+------+------+
| Sue| cat|   16| eagle|     2|
| Bob|fish|    5|lizard|     5|
| Kim| dog|    1|   ant|   179|
+----+----+-----+------+------+
df1.join(df2, 'name', how='outer').show()
df1.join(df2, 'name', how='left').show()

Missing-data

1 探索Null值

df = spark.read.csv('s3://ui-spark-social-science-public/data/Performance_2015Q1.txt', 
                    header=False, inferSchema=True, sep='|', nullValue='')

请注意,在nullValue=''行中,空字符串可以被数据集使用的任何值替换 - 这是告诉Spark哪些值应该在数据框中转换为空值。

df.count()
3526154
df.dtypes
[('_c0', 'bigint'),
 ('_c1', 'string'),
 ('_c2', 'string'),
 ('_c3', 'double'),
 ('_c4', 'double'),
 ('_c5', 'int'),
 ('_c6', 'int'),
 ('_c7', 'int'),
 ('_c8', 'string'),
 ('_c9', 'int'),
 ('_c10', 'string'),
 ('_c11', 'string'),
 ('_c12', 'int'),
 ('_c13', 'string'),
 ('_c14', 'string'),
 ('_c15', 'string'),
 ('_c16', 'string'),
 ('_c17', 'string'),
 ('_c18', 'string'),
 ('_c19', 'string'),
 ('_c20', 'string'),
 ('_c21', 'string'),
 ('_c22', 'string'),
 ('_c23', 'string'),
 ('_c24', 'string'),
 ('_c25', 'string'),
 ('_c26', 'int'),
 ('_c27', 'string')]
df.where( df['_c12'].isNull() ).count()
3510294
只能在列上调用isNull()方法,而不是在整个df上调用

利用UDF函数循环每列超出每列的空值个数

def count_nulls(df):
    null_counts = []          
    for col in df.dtypes:     
        cname = col[0]            
        ctype = col[1]        
        if ctype != 'string': 
            nulls = df.where( df[cname].isNull() ).count()
            result = tuple([cname, nulls])   
            null_counts.append(result)      
    return null_counts

null_counts = count_nulls(df)
null_counts
[('_c0', 0),
 ('_c3', 0),
 ('_c4', 1945752),
 ('_c5', 0),
 ('_c6', 0),
 ('_c7', 1),
 ('_c9', 0),
 ('_c12', 3510294),
 ('_c26', 3526153)]

2 删除空值

现在我们可以对'null'值做三件事,即我们知道数据框中的内容。
We can ignore them, we can drop them, or we can replace them。
请记住,PySpark数据框是不可变的,所以我们实际上不能改变原始数据集。 所有的操作都会返回一个全新的数据框,尽管我们可以用df = df.some_operation()来覆盖现有的数据框。

df_drops = df.dropna(how='all', subset=['_c4', '_c12', '_c26'])
df_drops.count()

df.dropna()方法在这里有两个参数:'how'可以等于'any'或者'all' 如果any的值为null。
subset参数需要一个你想查找null值的列的列表。 它实际上并不是数据框的子集; 它只是检查这三列,然后删除整个数据帧的行,如果该子集满足条件。 如果它应该检查所有的列为null,这可以被关闭。
“dropna()”可以使用第三个参数。 在“thresh”参数设置一个阈值之前,一个行中的“空”条目将其丢弃。 它被设置为一个整数,用于指定该行必须具有多少个非空参数。 如果它小于这个数字就会丢弃该行。**如果你像下面那样指定了这个参数,它将返回一个数据帧,其中指定子集中少于2个非空值的行被删除:

所以我们可以在上面看到,一旦我们删除'_c4,_c12和_c26列中所有具有null`值的行,我们在原来的3,526,154行中就剩下1,580,403行 在整个数据帧上。

df_drops2 = df.dropna(thresh=2, subset=['_c4', '_c12', '_c26'])
df_drops2.count()

3 替换空值

df_fill = df.fillna(0, subset=['_c12'])
df_fill.where( df_fill['_c12'].isNull() ).count()
0

我们看到它取代了之前发现的所有3,510,294个空值。 “fillna()”中的第一项可以是任何类型和任何值,如果填充应用于所有列,子集列表可以保持不变(尽管确保dtype与已经在该列中的内容一致)。 请注意,df.replace(a,b)做同样的事情,只有你指定a作为被替换的值,b作为替换。 它也接受可选的子集列表,但不利用优化的空值处理。

相关文章

网友评论

    本文标题:PySpark NoteBook-2

    本文链接:https://www.haomeiwen.com/subject/krltoxtx.html