美文网首页
SparkSQL之双重Group解决数据倾斜

SparkSQL之双重Group解决数据倾斜

作者: 阿坤的博客 | 来源:发表于2018-11-12 16:44 被阅读86次

本文介绍了如何使用自定义UDF来给key新增随机数前缀,并使用双重Group来解决数据倾斜。

主要内容:

  • 1.自定义UDF
  • 2.数据流程
  • 3.Spark程序

1.自定义UDF

RandomPrefixUDF.java

/**
 * 给字段添加随机前缀
 * random_prefix()
 *
 * @author Administrator
 */
public class RandomPrefixUDF implements UDF2<String, Integer, String> {

    private static final long serialVersionUID = 1L;

    @Override
    public String call(String val, Integer num) throws Exception {
        Random random = new Random();
        int randNum = random.nextInt(num);
        return randNum + "_" + val;
    }
}

给字段添加随机前缀

RemoveRandomPrefixUDF.java

/**
 * 去除随机前缀
 * @author Administrator
 *
 */
public class RemoveRandomPrefixUDF implements UDF1<String, String> {

    private static final long serialVersionUID = 1L;

    @Override
    public String call(String val) throws Exception {
        String[] valSplited = val.split("_");
        return valSplited[1];
    }
}

去除字段的随机前缀

2.数据流程

不使用随机前缀的流程

A 1
A 1
A 1
A 1
B 1

结果:

A 4
B 1

使用随机前缀的流程

A 1
A 1
A 1
A 1
B 1

--加随机前缀

0_A 1
0_A 1
1_A 1
1_A 1
0_B 1

--第一次GroupBy

0_A 2
1_A 2
0_B 1

--去掉随机前缀

A 2
A 2
B 1

--第二次GroupBy

A 4
B 1

3.Spark程序

/**
  * 通过StructType直接指定Schema,转换为DataFrame
  */
object TestUDF {
  def main(args: Array[String]): Unit = {
    val spark =
      SparkSession.builder()
        .appName("TestUDF")
        .master("local")
        .getOrCreate()

    val sc = spark.sparkContext
    sc.setLogLevel("WARN")

    spark.udf.register("random_prefix", new RandomPrefixUDF(), DataTypes.StringType)
    spark.udf.register("remove_random_prefix", new RemoveRandomPrefixUDF(), DataTypes.StringType)

    val personRDD =
      sc.parallelize(List("A", "A", "A", "A", "B"), 1)
        .map(x => (x, 1))
        .map(x => Row(x._1, x._2.toInt))

    // 创建Schema
    val schema: StructType = StructType(Seq(
      StructField("product", StringType, false),
      StructField("click", IntegerType, false)
    ))

    val personDF = spark.createDataFrame(personRDD, schema)

    //SQL语法操作
    personDF.createOrReplaceTempView("t_product_click")

    // 加随机前缀
    val sql1 =
      s"""
         |select
         |  random_prefix(product, 2) product,
         |  click
         |from
         |  t_product_click
       """.stripMargin

    // 分组求和
    val sql2 =
      s"""
         |select
         |  product,
         |  sum(click) click
         |from
         |  (
         |    select
         |      random_prefix(product, 2) product,
         |      click
         |    from
         |      t_product_click
         |  ) t1
         |group by
         |  product
       """.stripMargin

    // 去掉随机前缀
    val sql3 =
      s"""
         |select
         |  remove_random_prefix(product) product,
         |  click
         |from
         |  (
         |    select
         |      product,
         |      sum(click) click
         |    from
         |      (
         |        select
         |          random_prefix(product, 2) product,
         |          click
         |        from
         |          t_product_click
         |      ) t1
         |    group by
         |      product
         |  ) t2
         |
       """.stripMargin

    // 分组求和
    val sql4 =
      s"""
         |select
         |  product,
         |  sum(click) click
         |from
         |  (
         |    select
         |      remove_random_prefix(product) product,
         |      click
         |    from
         |      (
         |        select
         |          product,
         |          sum(click) click
         |        from
         |          (
         |            select
         |              random_prefix(product, 2) product,
         |              click
         |            from
         |              t_product_click
         |          ) t1
         |        group by
         |          product
         |      ) t2
         |  ) t3
         |group by
         |  product
       """.stripMargin

    spark.sql(sql1).show()
    spark.sql(sql2).show()
    spark.sql(sql3).show()
    spark.sql(sql4).show()
    sc.stop()
  }
}

执行结果:

+-------+-----+
|product|click|
+-------+-----+
|    0_A|    1|
|    1_A|    1|
|    0_A|    1|
|    0_A|    1|
|    1_B|    1|
+-------+-----+

+-------+-----+
|product|click|
+-------+-----+
|    1_A|    3|
|    1_B|    1|
|    0_A|    1|
+-------+-----+

+-------+-----+
|product|click|
+-------+-----+
|      A|    1|
|      B|    1|
|      A|    3|
+-------+-----+

+-------+-----+
|product|click|
+-------+-----+
|      B|    1|
|      A|    4|
+-------+-----+

这里是对所有的Key都加入了随机前缀,其实也可以先对数据样本抽样,提前筛选出会发生数据倾斜的Key来给加随机前缀,当然随机前缀也可以自定义算法。

相关文章

网友评论

      本文标题:SparkSQL之双重Group解决数据倾斜

      本文链接:https://www.haomeiwen.com/subject/gwtcfqtx.html