Merge
1.Stacking Rows with Matching Columns(行堆积)
你可能在每个数据框中都有相同的列,只是想将其中一个堆叠在另一个之上。
from pyspark.sql import Row
row = Row("name", "pet", "count")
df1 = sc.parallelize([
row("Sue", "cat", 16),
row("Kim", "dog", 1),
row("Bob", "fish", 5)
]).toDF()
df2 = sc.parallelize([
row("Fred", "cat", 2),
row("Kate", "ant", 179),
row("Marc", "lizard", 5)
]).toDF()
df3 = sc.parallelize([
row("Sarah", "shark", 3),
row("Jason", "kids", 2),
row("Scott", "squirrel", 1)
]).toDF()
df_union = df1.unionAll(df2)
df_union.show()
+----+------+-----+
|name| pet|count|
+----+------+-----+
| Sue| cat| 16|
| Kim| dog| 1|
| Bob| fish| 5|
|Fred| cat| 2|
|Kate| ant| 179|
|Marc|lizard| 5|
+----+------+-----+
unionAll方法只允许我们一次堆栈两个数据帧。 如果有多个数据帧以这种方式堆叠,我们可以重复这样做,但是我们也可以使用一个辅助函数UDF来使它更容易。
标准的Python命令reduce将一个函数应用到项目列表中,以便将其“减少”到一个输出。 有了这个,你可以将任意数量的数据框传递给我们的UDF函数,它们会堆积在一起:
from pyspark.sql import DataFrame
from functools import reduce
def union_many(*dfs):
return reduce(DataFrame.unionAll, dfs)
df_union = union_many(df1, df2, df3)
df_union.show()
+-----+--------+-----+
| name| pet|count|
+-----+--------+-----+
| Sue| cat| 16|
| Kim| dog| 1|
| Bob| fish| 5|
| Fred| cat| 2|
| Kate| ant| 179|
| Marc| lizard| 5|
|Sarah| shark| 3|
|Jason| kids| 2|
|Scott|squirrel| 1|
+-----+--------+-----+
2.Merging Columns by Matching Rows
合并的另一种方式是通过跨行的某些键组合列。
在我们构建数据之后,有四种方法可以指定操作的逻辑:
row1 = Row("name", "pet", "count")
row2 = Row("name", "pet2", "count2")
df1 = sc.parallelize([
row1("Sue", "cat", 16),
row1("Kim", "dog", 1),
row1("Bob", "fish", 5),
row1("Libuse", "horse", 1)
]).toDF()
df2 = sc.parallelize([
row2("Sue", "eagle", 2),
row2("Kim", "ant", 179),
row2("Bob", "lizard", 5),
row2("Ferdinand", "bees", 23)
]).toDF()
首先我们要做一个“内连接”(inner join),它将在两个数据框中都合并的行和在其他所有行中合并的行。 这是默认的连接类型,所以如果你不希望显式地指明(如果显式几乎总是更好的话),那么how参数可以省略。 我们将在name列中的条目进行合并,您可以看到这是方法中的第二个参数; 如果合并发生在多个匹配值上,这也可以是一个list:
df1.join(df2, 'name', how='inner').show()
+----+----+-----+------+------+
|name| pet|count| pet2|count2|
+----+----+-----+------+------+
| Sue| cat| 16| eagle| 2|
| Bob|fish| 5|lizard| 5|
| Kim| dog| 1| ant| 179|
+----+----+-----+------+------+
df1.join(df2, 'name', how='outer').show()
df1.join(df2, 'name', how='left').show()
Missing-data
1 探索Null值
df = spark.read.csv('s3://ui-spark-social-science-public/data/Performance_2015Q1.txt',
header=False, inferSchema=True, sep='|', nullValue='')
请注意,在nullValue=''行中,空字符串可以被数据集使用的任何值替换 - 这是告诉Spark哪些值应该在数据框中转换为空值。
df.count()
3526154
df.dtypes
[('_c0', 'bigint'),
('_c1', 'string'),
('_c2', 'string'),
('_c3', 'double'),
('_c4', 'double'),
('_c5', 'int'),
('_c6', 'int'),
('_c7', 'int'),
('_c8', 'string'),
('_c9', 'int'),
('_c10', 'string'),
('_c11', 'string'),
('_c12', 'int'),
('_c13', 'string'),
('_c14', 'string'),
('_c15', 'string'),
('_c16', 'string'),
('_c17', 'string'),
('_c18', 'string'),
('_c19', 'string'),
('_c20', 'string'),
('_c21', 'string'),
('_c22', 'string'),
('_c23', 'string'),
('_c24', 'string'),
('_c25', 'string'),
('_c26', 'int'),
('_c27', 'string')]
df.where( df['_c12'].isNull() ).count()
3510294
只能在列上调用isNull()方法,而不是在整个df上调用
利用UDF函数循环每列超出每列的空值个数
def count_nulls(df):
null_counts = []
for col in df.dtypes:
cname = col[0]
ctype = col[1]
if ctype != 'string':
nulls = df.where( df[cname].isNull() ).count()
result = tuple([cname, nulls])
null_counts.append(result)
return null_counts
null_counts = count_nulls(df)
null_counts
[('_c0', 0),
('_c3', 0),
('_c4', 1945752),
('_c5', 0),
('_c6', 0),
('_c7', 1),
('_c9', 0),
('_c12', 3510294),
('_c26', 3526153)]
2 删除空值
现在我们可以对'null'值做三件事,即我们知道数据框中的内容。
We can ignore them, we can drop them, or we can replace them。
请记住,PySpark数据框是不可变的,所以我们实际上不能改变原始数据集。 所有的操作都会返回一个全新的数据框,尽管我们可以用df = df.some_operation()来覆盖现有的数据框。
df_drops = df.dropna(how='all', subset=['_c4', '_c12', '_c26'])
df_drops.count()
df.dropna()方法在这里有两个参数:'how'可以等于'any'或者'all' 如果any的值为null。
subset参数需要一个你想查找null值的列的列表。 它实际上并不是数据框的子集; 它只是检查这三列,然后删除整个数据帧的行,如果该子集满足条件。 如果它应该检查所有的列为null,这可以被关闭。
“dropna()”可以使用第三个参数。 在“thresh”参数设置一个阈值之前,一个行中的“空”条目将其丢弃。 它被设置为一个整数,用于指定该行必须具有多少个非空参数。 如果它小于这个数字就会丢弃该行。**如果你像下面那样指定了这个参数,它将返回一个数据帧,其中指定子集中少于2个非空值的行被删除:
所以我们可以在上面看到,一旦我们删除'_c4,_c12和_c26列中所有具有null`值的行,我们在原来的3,526,154行中就剩下1,580,403行 在整个数据帧上。
df_drops2 = df.dropna(thresh=2, subset=['_c4', '_c12', '_c26'])
df_drops2.count()
3 替换空值
df_fill = df.fillna(0, subset=['_c12'])
df_fill.where( df_fill['_c12'].isNull() ).count()
0
我们看到它取代了之前发现的所有3,510,294个空值。 “fillna()”中的第一项可以是任何类型和任何值,如果填充应用于所有列,子集列表可以保持不变(尽管确保dtype与已经在该列中的内容一致)。 请注意,df.replace(a,b)做同样的事情,只有你指定a作为被替换的值,b作为替换。 它也接受可选的子集列表,但不利用优化的空值处理。
网友评论