美文网首页
pyspark一些简单常用的函数方法

pyspark一些简单常用的函数方法

作者: 井底蛙蛙呱呱呱 | 来源:发表于2019-10-21 16:44 被阅读0次

    1、将一个字符或数字列转换为vector/array

    from pyspark.sql.functions import col,udf
    from pyspark.ml.linalg import Vectors, _convert_to_vector, VectorUDT, DenseVector
    
    # 数字的可转为vector,但字符串转为vector会报错
    to_vec = udf(lambda x: DenseVector([x]), VectorUDT())
    
    # 字符串转为array
    to_array = udf(lambda x: [x], ArrayType(StringType()))
    

    2、从一个向量或数组列中获取某个位置处的值

    df = spark.createDataFrame([(1, [1,2,3]), (2, [4,5,6])], ['label', 'data'])
    df.show()
    df.printSchema()
    
    +-----+---------+
    |label|     data|
    +-----+---------+
    |    1|[1, 2, 3]|
    |    2|[4, 5, 6]|
    +-----+---------+
    
    root
     |-- label: long (nullable = true)
     |-- data: array (nullable = true)
     |    |-- element: long (containsNull = true)
    
    # 可以根据某一列的值作为索引来选择特定位置的值
    from pyspark.sql.functions import udf,col
    from pyspark.sql.types import FloatType
    
    firstelement=udf(lambda k,v:float(v[int(k)]),FloatType())
    df.withColumn('value', firstelement(col('label'), col('data'))).show(4, truncate=False)
    +-----+---------+-----+
    |label|data     |value|
    +-----+---------+-----+
    |1    |[1, 2, 3]|2.0  |
    |2    |[4, 5, 6]|6.0  |
    +-----+---------+-----+ 
    

    3、单个list列变多列

    参考https://stackoverflow.com/questions/45789489/how-to-split-a-list-to-multiple-columns-in-pyspark

    4、获取每个类别的前n个数据

    参考:https://stackoverflow.com/questions/38397796/retrieve-top-n-in-each-group-of-a-dataframe-in-pyspark

    rdd = sc.parallelize([("user_1",  "object_1",  3), 
                          ("user_1",  "object_2",  2), 
                          ("user_2",  "object_1",  5), 
                          ("user_2",  "object_2",  2), 
                          ("user_2",  "object_2",  6)])
    df = sqlContext.createDataFrame(rdd, ["user_id", "object_id", "score"])
    
    from pyspark.sql.window import Window
    from pyspark.sql.functions import rank, col
    
    window = Window.partitionBy(df['user_id']).orderBy(df['score'].desc())
    
    df.select('*', rank().over(window).alias('rank')) 
      .filter(col('rank') <= 2) 
      .show() 
    #+-------+---------+-----+----+
    #|user_id|object_id|score|rank|
    #+-------+---------+-----+----+
    #| user_1| object_1|    3|   1|
    #| user_1| object_2|    2|   2|
    #| user_2| object_2|    6|   1|
    #| user_2| object_1|    5|   2|
    #+-------+---------+-----+----+
    

    5、将字符串转变为日期

    from pyspark.sql.functions import min, max
    
    df = spark.createDataFrame([
      "2017-01-01", "2018-02-08", "2019-01-03", "2019-01-01"], "string"
    ).selectExpr("CAST(value AS date) AS date")
    
    # 或者
    df = spark.createDataFrame([
      "2017-01-01", "2018-02-08", "2019-01-03", "2019-01-01"], "string"
    ).withColumn('date', F.col('value').cast(DateType())) 
    
    min_date, max_date = df.select(min("date"), max("date")).first()
    min_date, max_date
    
    # (datetime.date(2017, 1, 1), datetime.date(2019, 1, 3))
    

    6、检测缺失值

    d1 = spark.createDataFrame([(10,'a',None), (20, 'b',3), (30, 'c',4), 
                                (20, 'b',5), (30, 'd',6), (40, None,7), 
                                (None, 'e',8)], ['value', 'key','v2'])
    d1 = d1.select('key', 'value', 'v2')
    
    d1.where(reduce(lambda x, y: x | y, (F.col(x).isNull() for x in d1.columns))).show()
    
    # 或者
    d1.where(F.col('key').isNull() | F.col('value').isNull() | F.col('v2').isNull()).show()
    
    # +----+-----+----+
    # | key|value|  v2|
    # +----+-----+----+
    # |   a|   10|null|
    # |null|   40|   7|
    # |   e| null|   8|
    # +----+-----+----+
    

    更进一步地,同时还检测NaN和空字符串:

    d1 = spark.createDataFrame([(10,'a', None), (20, 'b', 3.), (30, 'c',4.), 
                                (20, 'b',5.), (30, 'd', np.nan), (40, None,7.), 
                                (None, 'e',8.), (50, '', 8.)], ['value', 'key','v2'])
    d1 = d1.select('key', 'value', 'v2')
    d1.show()
    # +----+-----+----+
    # | key|value|  v2|
    # +----+-----+----+
    # |   a|   10|null|
    # |   b|   20| 3.0|
    # |   c|   30| 4.0|
    # |   b|   20| 5.0|
    # |   d|   30| NaN|
    # |null|   40| 7.0|
    # |   e| null| 8.0|
    # |    |   50| 8.0|
    # +----+-----+----+
    
    d1.where((F.col('key').isNotNull()) & (F.col('key')!='') & (~F.isnan(F.col('v2')))).show()
    # +---+-----+----+
    # |key|value|  v2|
    # +---+-----+----+
    # |  a|   10|null|
    # |  b|   20| 3.0|
    # |  c|   30| 4.0|
    # |  b|   20| 5.0|
    # |  e| null| 8.0|
    # +---+-----+----+
    

    7、填充非连续时间序列

    参考:https://walkenho.github.io/interpolating-time-series-p2-spark/
    https://stackoverflow.com/questions/42411184/filling-gaps-in-timeseries-spark

    或者 https://stackoverflow.com/questions/39271374/pyspark-how-to-resample-frequencies

    import random
    
    data = {'readtime' : pd.date_range(start='1/15/2018', end='02/14/2018', freq='D')\
                           .append(pd.date_range(start='1/15/2018', end='02/14/2018', freq='D')),
            'house' : ['house1' for i in range(31)] + ['house2' for i in range(31)],
            'readvalue': [0.5+0.5*np.sin(2*np.pi/30*i) for i in range(31)]\
                         + [0.5+0.5*np.cos(2*np.pi/30*i) for i in range(31)]}
    df0 = pd.DataFrame(data, columns = ['readtime', 'house', 'readvalue'])
    random.seed(42)
    df0 = df0.drop(random.sample(range(df0.shape[0]), k=int(df0.shape[0]/2)))
    df0.head()
    
        readtime    house   readvalue
    2   2018-01-17  house1  0.703368
    5   2018-01-20  house1  0.933013
    7   2018-01-22  house1  0.997261
    8   2018-01-23  house1  0.997261
    14  2018-01-29  house1  0.603956
    
    import pyspark.sql.functions as func
    from pyspark.sql.functions import col
    df = spark.createDataFrame(df0)
    df = df.withColumn("readtime", col('readtime')/1e9)\
           .withColumn("readtime_existent", col("readtime"))
    df.show(10, False)
    
    +-----------+------+--------------------+-----------------+
    |readtime   |house |readvalue           |readtime_existent|
    +-----------+------+--------------------+-----------------+
    |1.5161472E9|house1|0.7033683215379001  |1.5161472E9      |
    |1.5164064E9|house1|0.9330127018922193  |1.5164064E9      |
    |1.5165792E9|house1|0.9972609476841366  |1.5165792E9      |
    |1.5166656E9|house1|0.9972609476841368  |1.5166656E9      |
    |1.517184E9 |house1|0.6039558454088798  |1.517184E9       |
    |1.5172704E9|house1|0.5000000000000001  |1.5172704E9      |
    |1.5174432E9|house1|0.2966316784621001  |1.5174432E9      |
    |1.5175296E9|house1|0.2061073738537635  |1.5175296E9      |
    |1.5177024E9|house1|0.06698729810778081 |1.5177024E9      |
    |1.5177888E9|house1|0.024471741852423234|1.5177888E9      |
    +-----------+------+--------------------+-----------------+
    only showing top 10 rows
    

    三步实现填充时间gap:

    • In the first step, we group the data by ‘house’ and generate an array containing an equally spaced time grid for each house.

    • In the second step, we create one row for each element of the arrays by using the spark SQL function explode().

    • In the third step, the resulting structure is used as a basis to which the existing read value information is joined using an outer left join.

    from pyspark.sql.types import *
    
    # define function to create date range
    def date_range(t1, t2, step=60*60*24):
        """Return a list of equally spaced points between t1 and t2 with stepsize step."""
        return [t1 + step*x for x in range(int((t2-t1)/step)+1)]
    
    # define udf
    date_range_udf = func.udf(date_range, ArrayType(LongType()))
    
    # obtain min and max of time period for each house
    df_base = df.groupBy('house')\
                .agg(func.min('readtime').cast('integer').alias('readtime_min'), 
                     func.max('readtime').cast('integer').alias('readtime_max'))
    
    # generate timegrid and explode
    df_base = df_base.withColumn("readtime", func.explode(date_range_udf("readtime_min", "readtime_max")))\
                 .drop('readtime_min', 'readtime_max')
    
    # left outer join existing read values
    df_all_dates = df_base.join(df, ["house", "readtime"], "leftouter")
    
    tmp = df_all_dates.withColumn('readtime', func.from_unixtime(col('readtime')))
    tmp.orderBy('house','readtime').show(20, False)
    
    +------+-------------------+--------------------+-----------------+
    |house |readtime           |readvalue           |readtime_existent|
    +------+-------------------+--------------------+-----------------+
    |house1|2018-01-17 08:00:00|0.7033683215379001  |1.5161472E9      |
    |house1|2018-01-18 08:00:00|null                |null             |
    |house1|2018-01-19 08:00:00|null                |null             |
    |house1|2018-01-20 08:00:00|0.9330127018922193  |1.5164064E9      |
    |house1|2018-01-21 08:00:00|null                |null             |
    |house1|2018-01-22 08:00:00|0.9972609476841366  |1.5165792E9      |
    |house1|2018-01-23 08:00:00|0.9972609476841368  |1.5166656E9      |
    |house1|2018-01-24 08:00:00|null                |null             |
    |house1|2018-01-25 08:00:00|null                |null             |
    |house1|2018-01-26 08:00:00|null                |null             |
    |house1|2018-01-27 08:00:00|null                |null             |
    |house1|2018-01-28 08:00:00|null                |null             |
    |house1|2018-01-29 08:00:00|0.6039558454088798  |1.517184E9       |
    |house1|2018-01-30 08:00:00|0.5000000000000001  |1.5172704E9      |
    |house1|2018-01-31 08:00:00|null                |null             |
    |house1|2018-02-01 08:00:00|0.2966316784621001  |1.5174432E9      |
    |house1|2018-02-02 08:00:00|0.2061073738537635  |1.5175296E9      |
    |house1|2018-02-03 08:00:00|null                |null             |
    |house1|2018-02-04 08:00:00|0.06698729810778081 |1.5177024E9      |
    |house1|2018-02-05 08:00:00|0.024471741852423234|1.5177888E9      |
    +------+-------------------+--------------------+-----------------+
    only showing top 20 rows
    

    8、当前行与上一行值得差

    参考:https://www.arundhaj.com/blog/calculate-difference-with-previous-row-in-pyspark.html

    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    
    sc = SparkContext(appName="PrevRowDiffApp")
    sqlc = SQLContext(sc)
    
    rdd = sc.parallelize([(1, 65), (2, 66), (3, 65), (4, 68), (5, 71)])
    
    df = sqlc.createDataFrame(rdd, ["id", "value"])
    
    my_window = Window.partitionBy().orderBy("id")
    
    df = df.withColumn("prev_value", F.lag(df.value).over(my_window))
    df = df.withColumn("diff", F.when(F.isnull(df.value - df.prev_value), 0)
                                  .otherwise(df.value - df.prev_value))
    
    df.show()
    +---+-----+----------+----+
    | id|value|prev_value|diff|
    +---+-----+----------+----+
    |  1|   65|      null|   0|
    |  2|   66|        65|   1|
    |  3|   65|        66|  -1|
    |  4|   68|        65|   3|
    |  5|   71|        68|   3|
    +---+-----+----------+----+
    

    9、udf如何传入多个参数来计算list column不同的quantile?

    利用闭包:https://stackoverflow.com/questions/52843485/pyspark-pass-multiple-columns-along-with-an-argument-in-udf

    def quantile_udf(q=0.5):
        """计算quantile
        """
        def quantile_(arr):
            return float(np.quantile(arr, q))
        return F.udf(quantile_, DoubleType())
    
    history_stat = history_stat.withColumn('quantile_025', quantile_udf(0.25)(F.col('date_diff_list')))\
                            .withColumn('quantile_05', quantile_udf(0.5)(F.col('date_diff_list')))\
                            .withColumn('quantile_075', quantile_udf(0.75)(F.col('date_diff_list')))
    
    
    history_stat.show(2)
    +---------------+-----------------+----------+----------+----------+------------------+-------------------+--------------------+------------+-----------+------------+
    |cust_store_dkey|cust_product_dkey| first_day|  last_day|sale_times|max_sales_interval|mean_sales_interval|      date_diff_list|quantile_025|quantile_05|quantile_075|
    +---------------+-----------------+----------+----------+----------+------------------+-------------------+--------------------+------------+-----------+------------+
    |            560|              211|2017-12-10|2019-03-08|       180|                16| 2.5166666666666666|[0, 11, 4, 11, 1,...|         1.0|        1.0|         3.0|
    |            560|              990|2016-12-30|2017-03-17|        20|                26|               3.85|[0, 1, 1, 1, 2, 3...|         1.0|        2.0|         4.0|
    +---------------+-----------------+----------+----------+----------+------------------+-------------------+--------------------+------------+-----------+------------+
    

    10、对特定条件的值进行替换

    https://stackoverflow.com/questions/44773758/how-to-conditionally-replace-value-in-a-column-based-on-evaluation-of-expression

    import numpy as np
    
    df = spark.createDataFrame(
        [(1, 1, None),
         (1, 2, float(5)),
         (1, 3, np.nan),
         (1, 4, None),
         (0, 5, float(10)),
         (1, 6, float('nan')),
         (0, 6, float('nan'))],
        ('session', "timestamp1", "id2"))
    
    +-------+----------+----+
    |session|timestamp1| id2|
    +-------+----------+----+
    |      1|         1|null|
    |      1|         2| 5.0|
    |      1|         3| NaN|
    |      1|         4|null|
    |      0|         5|10.0|
    |      1|         6| NaN|
    |      0|         6| NaN|
    +-------+----------+----+
    
    from pyspark.sql.functions import when
    
    targetDf = df.withColumn("timestamp1", \
                  when(df["session"] == 0, 999).otherwise(df["timestamp1"]))
    

    11、如何将sparse vector转化为array?

    vector_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(FloatType()))
    
    df = df.withColumn('col1', vector_udf('col2'))
    

    需要注意的是,udf中的tolist() 是必须的, 因为spark中没有np.array类型。类似的,当我们返回一个np.dtype类型数据的时候,也需要使用floatint对其进行转换。

    12、如何将pyspark sparse vector转化为 scipy sparse matrix以及pytorch sparse tensor?

    参考:https://stackoverflow.com/questions/40557577/pyspark-sparse-vectors-to-scipy-sparse-matrix
    将spark sparse vector转换为scipy csr matrix如下:

    import numpy as np
    from scipy.sparse import vstack
    import numpy as np
    from scipy.sparse import csr_matrix
    import torch
    
    def as_matrix(vec):
        data, indices = vec.values, vec.indices
        shape = 1, vec.size
        return csr_matrix((data, indices, np.array([0, vec.values.size])), shape)
    
    # cv_cols表示spark中countvectorizer得到的稀疏矩阵
    train_pd[csr_cols] = train_pd[cv_cols].applymap(lambda x: as_matrix(x))
    
    # 上面的代码将每一行的spark sparse vector转换为了scipy csr matrix,
    # 通过下面的代码可以将每一列的所有行csr matrix进行合并,得到一个大的csr matrix
    csr_col1 = vstack(train_pd['csr_col1'])
    

    通过上面的代码可以将sparse vector转换为scipy sparse matrix,具体地——scipy csr matrix。

    下面我们再将scipy csr matrix转换为pytorch sparse tensor。

    def sparse2tensor(tmpdf):
        """
        tmpdf 表示一个scipy csr matrix,如上面得到的csr_col1。
        """
        tmpdf_coo = vstack(tmpdf).tocoo()
        # 下面代码中的torch.Size能保证转换为sparse tensor后维度一致
        sptensor = torch.sparse.FloatTensor(torch.LongTensor([tmpdf_coo.row.tolist(), tmpdf_coo.col.tolist()]),
                                         torch.FloatTensor(tmpdf_coo.data), torch.Size(tmpdf_coo.shape))
        return sptensor
    
    spt = sparse2tensor(csr_cols1)
    

    当数据维度非常大且稀疏的时候,使用sparse matrix/tensor能极大的减少内存占用,是一个非常实用的方法。

    12、稀疏向量求和

    def sum_vector(vector):
        return float(vector.values.sum())
    

    相关文章

      网友评论

          本文标题:pyspark一些简单常用的函数方法

          本文链接:https://www.haomeiwen.com/subject/cfetvctx.html