PySpark NoteBook-3

作者: 7125messi | 来源:发表于2018-01-11 21:00 被阅读124次
df = spark.read.csv('s3://ui-spark-social-science-public/data/diamonds_nulls.csv',\
inferSchema=True, \
header=True, \
sep=',', \
nullValue='')
df.show()
+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
| 0.26|Very Good|    H|    SI1| 61.9| 55.0|  337|4.07|4.11|2.53|
| 0.22|     Fair|    E|    VS2| 65.1| 61.0|  337|3.87|3.78|2.49|
| 0.23|Very Good|    H|    VS1| 59.4| 61.0|  338| 4.0|4.05|2.39|
|  0.3|     Good|    J|    SI1| 64.0| 55.0|  339|4.25|4.28|2.73|
| 0.23|    Ideal|    J|    VS1| 62.8| 56.0|  340|3.93| 3.9|2.46|
| 0.22|  Premium|    F|    SI1| 60.4| 61.0|  342|3.88|3.84|2.33|
| 0.31|    Ideal|    J|    SI2| 62.2| 54.0|  344|4.35|4.37|2.71|
|  0.2|  Premium|    E|    SI2| 60.2| 62.0|  345|3.79|3.75|2.27|
| 0.32|  Premium|    E|     I1| 60.9| 58.0|  345|4.38|4.42|2.68|
|  0.3|    Ideal|    I|    SI2| 62.0| 54.0|  348|4.31|4.34|2.68|
|  0.3|     Good|    J|    SI1| 63.4| 54.0|  351|4.23|4.29| 2.7|
|  0.3|     Good|    J|    SI1| 63.8| 56.0|  351|4.23|4.26|2.71|
|  0.3|Very Good|    J|    SI1| 62.7| 59.0|  351|4.21|4.27|2.66|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+
only showing top 20 rows
df.where(df['price'].isNull()).show(50)

1.Defining a Window

窗口的第一步是定义窗口参数。我们通过结合三个元素来完成这个操作:分组(partitionBy),排序(orderBy)和范围(rowsBetween)。 然后我们定义的窗口被分配给一个变量,我们将用它来执行计算。

from pyspark.sql import Window
window = Window.partitionBy('cut', 'clarity').orderBy('price').rowsBetween(-3, 3)
window
<pyspark.sql.window.WindowSpec at 0x7faf3ca68690>

第一部分'partionBy('cut','clarity')有点误导性地命名,因为它与Spark中的* partitions *无关,它们是分布式数据的一部分,大致类似于集群内的单个计算机。 它与groupBy的关系更为密切。 它告诉PySpark窗口应该只在每个“cut”和“clarity”列中进行计算。

第二部分orderBy('price')只是在每个partitionBy列内按价格排序数据。

最后,rowsBetween(-3,3)指定窗口的大小。 在这种情况下,每个窗口中包含七行 - 当前行加上前三行和后三行。

2.Operations Over a Window

下一步是将这个窗口应用于一个操作,我们可以使用over方法来做这个操作。 这里我们将使用mean作为我们的聚合器,但是你可以用任何有效的聚合函数来做到这一点。

from pyspark.sql.functions import mean
moving_avg = mean(df['price']).over(window)

这创建了一个列对象,它包含创建数据所需的一组SQL指令。 PySpark能够为大多数操作获取SQL格式化的指令,在窗口化的情况下,SQL是Python代码的基础。

请记住,PySpark数据框是不可变的,所以我们不能只填写缺失的值。相反,我们必须创建一个新的列,然后改写数据框以包含它:

df = df.withColumn('moving_avg', moving_avg)
df.show()
+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|        moving_avg|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+
| 0.73|Premium|    F|    VS2| 62.5| 57.0| null|5.75| 5.7|3.58|             356.0|
| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|            358.75|
|  0.2|Premium|    E|    VS2| 59.8| 62.0|  367|3.79|3.77|2.26|             360.4|
|  0.2|Premium|    E|    VS2| 59.0| 60.0|  367|3.81|3.78|2.24|             361.5|
|  0.2|Premium|    E|    VS2| 61.1| 59.0|  367|3.81|3.78|2.32| 362.2857142857143|
|  0.2|Premium|    E|    VS2| 59.7| 62.0|  367|3.84| 3.8|2.28|             367.0|
|  0.2|Premium|    F|    VS2| 62.6| 59.0|  367|3.73|3.71|2.33|367.14285714285717|
|  0.2|Premium|    D|    VS2| 62.3| 60.0|  367|3.73|3.68|2.31| 367.2857142857143|
|  0.2|Premium|    D|    VS2| 61.7| 60.0|  367|3.77|3.72|2.31|369.14285714285717|
|  0.3|Premium|    J|    VS2| 62.2| 58.0|  368|4.28| 4.3|2.67|             371.0|
|  0.3|Premium|    J|    VS2| 60.6| 59.0|  368|4.34|4.38|2.64| 373.7142857142857|
| 0.31|Premium|    J|    VS2| 62.5| 60.0|  380|4.31|4.36|2.71|376.42857142857144|
| 0.31|Premium|    J|    VS2| 62.4| 60.0|  380|4.29|4.33|2.69|379.14285714285717|
| 0.21|Premium|    E|    VS2| 60.5| 59.0|  386|3.87|3.83|2.33| 381.7142857142857|
| 0.21|Premium|    E|    VS2| 59.6| 56.0|  386|3.93|3.89|2.33| 384.2857142857143|
| 0.21|Premium|    D|    VS2| 61.6| 59.0|  386|3.82|3.78|2.34|385.14285714285717|
| 0.21|Premium|    D|    VS2| 60.6| 60.0|  386|3.85|3.81|2.32|             387.0|
| 0.21|Premium|    D|    VS2| 59.1| 62.0|  386|3.89|3.86|2.29|             388.0|
| 0.21|Premium|    D|    VS2| 58.3| 59.0|  386|3.96|3.93| 2.3|389.57142857142856|
| 0.32|Premium|    J|    VS2| 61.9| 58.0|  393|4.35|4.38| 2.7|392.14285714285717|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+

3.Imputation

由于不可变性,我们将重新调用另一个列的数据框,如果它不是null **,那么将从price列中得到值,并且填入''moving_avg列中的值 如果**是`null` **。 我们将使用 PySpark内置的when ... otherwise` *条件来做到这一点。然后我们把这个条件写入一个名为“imputed”的新列。

from pyspark.sql.functions import when, col

def replace_null(orig, ma):
    return when(orig.isNull(), ma).otherwise(orig)
df_new = df.withColumn('imputed', replace_null(col('price'), col('moving_avg')) )
df_new.show()
+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+-------+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|        moving_avg|imputed|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+-------+
| 0.73|Premium|    F|    VS2| 62.5| 57.0| null|5.75| 5.7|3.58|             356.0|  356.0|
| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|            358.75|  334.0|
|  0.2|Premium|    E|    VS2| 59.8| 62.0|  367|3.79|3.77|2.26|             360.4|  367.0|
|  0.2|Premium|    E|    VS2| 59.0| 60.0|  367|3.81|3.78|2.24|             361.5|  367.0|
|  0.2|Premium|    E|    VS2| 61.1| 59.0|  367|3.81|3.78|2.32| 362.2857142857143|  367.0|
|  0.2|Premium|    E|    VS2| 59.7| 62.0|  367|3.84| 3.8|2.28|             367.0|  367.0|
|  0.2|Premium|    F|    VS2| 62.6| 59.0|  367|3.73|3.71|2.33|367.14285714285717|  367.0|
|  0.2|Premium|    D|    VS2| 62.3| 60.0|  367|3.73|3.68|2.31| 367.2857142857143|  367.0|
|  0.2|Premium|    D|    VS2| 61.7| 60.0|  367|3.77|3.72|2.31|369.14285714285717|  367.0|
|  0.3|Premium|    J|    VS2| 62.2| 58.0|  368|4.28| 4.3|2.67|             371.0|  368.0|
|  0.3|Premium|    J|    VS2| 60.6| 59.0|  368|4.34|4.38|2.64| 373.7142857142857|  368.0|
| 0.31|Premium|    J|    VS2| 62.5| 60.0|  380|4.31|4.36|2.71|376.42857142857144|  380.0|
| 0.31|Premium|    J|    VS2| 62.4| 60.0|  380|4.29|4.33|2.69|379.14285714285717|  380.0|
| 0.21|Premium|    E|    VS2| 60.5| 59.0|  386|3.87|3.83|2.33| 381.7142857142857|  386.0|
| 0.21|Premium|    E|    VS2| 59.6| 56.0|  386|3.93|3.89|2.33| 384.2857142857143|  386.0|
| 0.21|Premium|    D|    VS2| 61.6| 59.0|  386|3.82|3.78|2.34|385.14285714285717|  386.0|
| 0.21|Premium|    D|    VS2| 60.6| 60.0|  386|3.85|3.81|2.32|             387.0|  386.0|
| 0.21|Premium|    D|    VS2| 59.1| 62.0|  386|3.89|3.86|2.29|             388.0|  386.0|
| 0.21|Premium|    D|    VS2| 58.3| 59.0|  386|3.96|3.93| 2.3|389.57142857142856|  386.0|
| 0.32|Premium|    J|    VS2| 61.9| 58.0|  393|4.35|4.38| 2.7|392.14285714285717|  393.0|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+-------+
only showing top 20 rows

相关文章

网友评论

    本文标题:PySpark NoteBook-3

    本文链接:https://www.haomeiwen.com/subject/uiwtoxtx.html