spark

作者: 渡猫 | 来源:发表于2018-09-27 18:05 被阅读0次

    lag,lead平移

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import lag, lead
    import pyspark.sql.functions as F
    from pyspark.sql.window import Window
    spark = SparkSession.builder.appName('my_first_app').getOrCreate()
    df = spark.createDataFrame([(1,2,3), (4,2,6), (7,2,9)],
                               ["a", "b", "c"])
    df.show()
    df.withColumn('aa', lag('a').over(
        Window.partitionBy('b').orderBy('a')
        )
    ).show()
    

    scala加载xxx.scala文件

    # 前面冒号不可省略
    :load xxx.scala 
    

    join

    # 列名相同时,明确父类
    df1.join(df2, df1("a") == df2("a")).select(df1("f")).show(2) 
    
    # 设置表别名
    df1.alias("df1").join(df2.alias("df2"), "a").select($"a", $"df1.f"+$"df2.f").show(5)
    

    计算缺失值比例

    test.agg(*[(1-(F.count(c) /F.count('*'))).alias(c+'_missing') for c in test.columns]).show()
    

    自定义函数F.udf

    def get_week(time):
        return time.weekday()
    toWeekUDF = F.udf(get_week, IntegerType()) # 参数1为自定义函数, 参数2为返回类型
    test.select(toWeekUDF(test.operTime)).show(5)
    test.select(F.dayofweek('operTime')).show(5)
    
    # 多参数传递
    def map_dict(x, col):
        return feature_dict_sc.value[col][x]
    mapDictUDF = F.udf(map_dict, IntegerType())
    test.select(mapDictUDF(test.siteId, F.lit('siteId')).alias('ss')).show(5)
    
    def mapDict(col):
        return F.udf(lambda x:feature_dict_sc.value[col][x])
    test.withColumn('ss', mapDict('siteId')(test.siteId)).show(5)
    

    F函数库

    agg需要用整列处理函数对应,如min,maxselect需要用每行处理函数,如hour,dayofweek

    相关文章

      网友评论

          本文标题:spark

          本文链接:https://www.haomeiwen.com/subject/asqyoftx.html