df = spark.read.csv('s3://ui-spark-social-science-public/data/diamonds_nulls.csv',\
inferSchema=True, \
header=True, \
sep=',', \
nullValue='')
df.show()
+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|carat| cut|color|clarity|depth|table|price| x| y| z|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+
| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43|
| 0.21| Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31|
| 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31|
| 0.29| Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63|
| 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75|
| 0.24|Very Good| J| VVS2| 62.8| 57.0| 336|3.94|3.96|2.48|
| 0.24|Very Good| I| VVS1| 62.3| 57.0| 336|3.95|3.98|2.47|
| 0.26|Very Good| H| SI1| 61.9| 55.0| 337|4.07|4.11|2.53|
| 0.22| Fair| E| VS2| 65.1| 61.0| 337|3.87|3.78|2.49|
| 0.23|Very Good| H| VS1| 59.4| 61.0| 338| 4.0|4.05|2.39|
| 0.3| Good| J| SI1| 64.0| 55.0| 339|4.25|4.28|2.73|
| 0.23| Ideal| J| VS1| 62.8| 56.0| 340|3.93| 3.9|2.46|
| 0.22| Premium| F| SI1| 60.4| 61.0| 342|3.88|3.84|2.33|
| 0.31| Ideal| J| SI2| 62.2| 54.0| 344|4.35|4.37|2.71|
| 0.2| Premium| E| SI2| 60.2| 62.0| 345|3.79|3.75|2.27|
| 0.32| Premium| E| I1| 60.9| 58.0| 345|4.38|4.42|2.68|
| 0.3| Ideal| I| SI2| 62.0| 54.0| 348|4.31|4.34|2.68|
| 0.3| Good| J| SI1| 63.4| 54.0| 351|4.23|4.29| 2.7|
| 0.3| Good| J| SI1| 63.8| 56.0| 351|4.23|4.26|2.71|
| 0.3|Very Good| J| SI1| 62.7| 59.0| 351|4.21|4.27|2.66|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+
only showing top 20 rows
df.where(df['price'].isNull()).show(50)
1.Defining a Window
窗口的第一步是定义窗口参数。我们通过结合三个元素来完成这个操作:分组(partitionBy),排序(orderBy)和范围(rowsBetween)。 然后我们定义的窗口被分配给一个变量,我们将用它来执行计算。
from pyspark.sql import Window
window = Window.partitionBy('cut', 'clarity').orderBy('price').rowsBetween(-3, 3)
window
<pyspark.sql.window.WindowSpec at 0x7faf3ca68690>
第一部分'partionBy('cut','clarity')有点误导性地命名,因为它与Spark中的* partitions *无关,它们是分布式数据的一部分,大致类似于集群内的单个计算机。 它与groupBy的关系更为密切。 它告诉PySpark窗口应该只在每个“cut”和“clarity”列中进行计算。
第二部分orderBy('price')只是在每个partitionBy列内按价格排序数据。
最后,rowsBetween(-3,3)指定窗口的大小。 在这种情况下,每个窗口中包含七行 - 当前行加上前三行和后三行。
2.Operations Over a Window
下一步是将这个窗口应用于一个操作,我们可以使用over
方法来做这个操作。 这里我们将使用mean
作为我们的聚合器,但是你可以用任何有效的聚合函数来做到这一点。
from pyspark.sql.functions import mean
moving_avg = mean(df['price']).over(window)
这创建了一个列对象,它包含创建数据所需的一组SQL指令。 PySpark能够为大多数操作获取SQL格式化的指令,在窗口化的情况下,SQL是Python代码的基础。
请记住,PySpark数据框是不可变的,所以我们不能只填写缺失的值。相反,我们必须创建一个新的列,然后改写数据框以包含它:
df = df.withColumn('moving_avg', moving_avg)
df.show()
+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+
|carat| cut|color|clarity|depth|table|price| x| y| z| moving_avg|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+
| 0.73|Premium| F| VS2| 62.5| 57.0| null|5.75| 5.7|3.58| 356.0|
| 0.29|Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63| 358.75|
| 0.2|Premium| E| VS2| 59.8| 62.0| 367|3.79|3.77|2.26| 360.4|
| 0.2|Premium| E| VS2| 59.0| 60.0| 367|3.81|3.78|2.24| 361.5|
| 0.2|Premium| E| VS2| 61.1| 59.0| 367|3.81|3.78|2.32| 362.2857142857143|
| 0.2|Premium| E| VS2| 59.7| 62.0| 367|3.84| 3.8|2.28| 367.0|
| 0.2|Premium| F| VS2| 62.6| 59.0| 367|3.73|3.71|2.33|367.14285714285717|
| 0.2|Premium| D| VS2| 62.3| 60.0| 367|3.73|3.68|2.31| 367.2857142857143|
| 0.2|Premium| D| VS2| 61.7| 60.0| 367|3.77|3.72|2.31|369.14285714285717|
| 0.3|Premium| J| VS2| 62.2| 58.0| 368|4.28| 4.3|2.67| 371.0|
| 0.3|Premium| J| VS2| 60.6| 59.0| 368|4.34|4.38|2.64| 373.7142857142857|
| 0.31|Premium| J| VS2| 62.5| 60.0| 380|4.31|4.36|2.71|376.42857142857144|
| 0.31|Premium| J| VS2| 62.4| 60.0| 380|4.29|4.33|2.69|379.14285714285717|
| 0.21|Premium| E| VS2| 60.5| 59.0| 386|3.87|3.83|2.33| 381.7142857142857|
| 0.21|Premium| E| VS2| 59.6| 56.0| 386|3.93|3.89|2.33| 384.2857142857143|
| 0.21|Premium| D| VS2| 61.6| 59.0| 386|3.82|3.78|2.34|385.14285714285717|
| 0.21|Premium| D| VS2| 60.6| 60.0| 386|3.85|3.81|2.32| 387.0|
| 0.21|Premium| D| VS2| 59.1| 62.0| 386|3.89|3.86|2.29| 388.0|
| 0.21|Premium| D| VS2| 58.3| 59.0| 386|3.96|3.93| 2.3|389.57142857142856|
| 0.32|Premium| J| VS2| 61.9| 58.0| 393|4.35|4.38| 2.7|392.14285714285717|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+
3.Imputation
由于不可变性,我们将重新调用另一个列的数据框,如果它不是null
**,那么将从price
列中得到值,并且填入''moving_avg列中的值 如果**是`null` **。 我们将使用 PySpark内置的
when ... otherwise` *条件来做到这一点。然后我们把这个条件写入一个名为“imputed”的新列。
from pyspark.sql.functions import when, col
def replace_null(orig, ma):
return when(orig.isNull(), ma).otherwise(orig)
df_new = df.withColumn('imputed', replace_null(col('price'), col('moving_avg')) )
df_new.show()
+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+-------+
|carat| cut|color|clarity|depth|table|price| x| y| z| moving_avg|imputed|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+-------+
| 0.73|Premium| F| VS2| 62.5| 57.0| null|5.75| 5.7|3.58| 356.0| 356.0|
| 0.29|Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63| 358.75| 334.0|
| 0.2|Premium| E| VS2| 59.8| 62.0| 367|3.79|3.77|2.26| 360.4| 367.0|
| 0.2|Premium| E| VS2| 59.0| 60.0| 367|3.81|3.78|2.24| 361.5| 367.0|
| 0.2|Premium| E| VS2| 61.1| 59.0| 367|3.81|3.78|2.32| 362.2857142857143| 367.0|
| 0.2|Premium| E| VS2| 59.7| 62.0| 367|3.84| 3.8|2.28| 367.0| 367.0|
| 0.2|Premium| F| VS2| 62.6| 59.0| 367|3.73|3.71|2.33|367.14285714285717| 367.0|
| 0.2|Premium| D| VS2| 62.3| 60.0| 367|3.73|3.68|2.31| 367.2857142857143| 367.0|
| 0.2|Premium| D| VS2| 61.7| 60.0| 367|3.77|3.72|2.31|369.14285714285717| 367.0|
| 0.3|Premium| J| VS2| 62.2| 58.0| 368|4.28| 4.3|2.67| 371.0| 368.0|
| 0.3|Premium| J| VS2| 60.6| 59.0| 368|4.34|4.38|2.64| 373.7142857142857| 368.0|
| 0.31|Premium| J| VS2| 62.5| 60.0| 380|4.31|4.36|2.71|376.42857142857144| 380.0|
| 0.31|Premium| J| VS2| 62.4| 60.0| 380|4.29|4.33|2.69|379.14285714285717| 380.0|
| 0.21|Premium| E| VS2| 60.5| 59.0| 386|3.87|3.83|2.33| 381.7142857142857| 386.0|
| 0.21|Premium| E| VS2| 59.6| 56.0| 386|3.93|3.89|2.33| 384.2857142857143| 386.0|
| 0.21|Premium| D| VS2| 61.6| 59.0| 386|3.82|3.78|2.34|385.14285714285717| 386.0|
| 0.21|Premium| D| VS2| 60.6| 60.0| 386|3.85|3.81|2.32| 387.0| 386.0|
| 0.21|Premium| D| VS2| 59.1| 62.0| 386|3.89|3.86|2.29| 388.0| 386.0|
| 0.21|Premium| D| VS2| 58.3| 59.0| 386|3.96|3.93| 2.3|389.57142857142856| 386.0|
| 0.32|Premium| J| VS2| 61.9| 58.0| 393|4.35|4.38| 2.7|392.14285714285717| 393.0|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+-------+
only showing top 20 rows
网友评论