美文网首页
Spark里面Agg自定义聚合函数 --中位数(Median)

Spark里面Agg自定义聚合函数 --中位数(Median)

作者: chenxk | 来源:发表于2019-04-19 18:40 被阅读0次

    Spark本身的实现中位数不能用于groupBy的agg函数,下面代码实现在agg中调用

    原生Spark计算中位数

    df2.stat.approxQuantile("value", Array(0.5), 0)
    

    1.定义方法

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
    
    
    object PercentileApprox {
      def percentile_approx(col: Column, percentage: Column, accuracy: Column): Column = {
        val expr = new ApproximatePercentile(
          col.expr,  percentage.expr, accuracy.expr
        ).toAggregateExpression
        new Column(expr)
      }
      def percentile_approx(col: Column, percentage: Column): Column = percentile_approx(
        col, percentage, lit(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY)
      )
    }
    

    2.#调用方法

    object AlgorithmTest {
    
    
    
    
      def main(args: Array[String]): Unit = {
        //System.setProperty("hadoop.home.dir", "D:\\hadoop-common-2.2.0-bin-master")
    
        val spark = SparkSession
          .builder
          .appName("InterfaceMonitor")
          .master("local[2]")
          .getOrCreate()
    
        import spark.implicits._
    
    
          val ds = Seq(
            ("20181102221610c07vy","10000011","10000032",20.0,1,20.0,0 ,"2019-04-19 22:16:10.0"),
            ("20181102221733dgvcv","10000011","10000032",20.0,1,20.0,0 ,"2019-04-19 22:17:34.0"),
            ("20181102222339oakpn","10000061","10000032",0.2 ,1,0.2 ,5 ,"2019-04-19 22:23:39.0"),
            ("20181102225503nhath","10000061","10000032",20.0,1,20.0,7 ,"2019-04-19 22:55:03.0"),
            ("201811030008236k9yy","10000061","10000032",0.2 ,1,0.2 ,5 ,"2019-04-19 00:08:23.0"),
            ("20181103005135do5zg","10000069","10000015",0.2 ,1,0.2 ,0 ,"2019-04-19 00:51:35.0"),
            ("20181103005148ptr7a","10000069","10000015",0.2 ,1,0.2 ,0 ,"2019-04-19 00:51:48.0"),
            ("20181103005148w9isk","10000069","10000015",0.2 ,1,0.2 ,5 ,"2019-04-19 00:51:48.0"),
            ("20181103005205b8gvm","10000069","10000015",0.2 ,1,0.2 ,0 ,"2019-04-19 00:52:05.0"),
            ("20181103015930m2cz0","10000011","10000063",30.0,1,30.0,0 ,"2019-04-19 01:59:30.0")
          ).toDS()
            .toDF("order_id","play_uid","god_uid","price","num","order_amount","order_status","create_time")
    
    
    
        val df = spark.read.format("json").json("file:///e:\\a.json")
    
        ds.groupBy($"god_uid")
    
          .agg(sum("num") as "total_sum",
            approx_count_distinct("order_id") as "order_num",
            mean("price"),
            percentile_approx($"price", lit(0.5)))
          .show(10)
    
    
      }
    
    }
    

    参考文章:https://stackoverflow.com/questions/53548964/how-to-use-approxquantile-by-group

    相关文章

      网友评论

          本文标题:Spark里面Agg自定义聚合函数 --中位数(Median)

          本文链接:https://www.haomeiwen.com/subject/ngvcgqtx.html