美文网首页
Spark经典案例之数据去重

Spark经典案例之数据去重

作者: printf200 | 来源:发表于2019-04-18 08:16 被阅读0次

    /**

    • 业务场景:数据去重问题
    • Created by YJ on 2017/2/7.
    • 统计数据,尽量用reduceByKey,不要用groupByKey,优化点
    • reduceByKey,在本机suffle后,再发送一个总map,发送到一个总机器上汇总,(汇总要压力小)
    • groupByKey,发送本机所有的map,在一个机器上汇总(汇总压力大)
      /
      /

    数据格式
    flie1:
    2012-3-1 a
    2012-3-2 b
    2012-3-3 c
    2012-3-4 d
    2012-3-5 a
    2012-3-6 b
    2012-3-7 c
    2012-3-3 c
    flie2:
    2012-3-1 b
    2012-3-2 a
    2012-3-3 b
    2012-3-4 d
    2012-3-5 a
    2012-3-6 c
    2012-3-7 d
    2012-3-3 c
    */

    package ClassicCase
    
    import org.apache.spark.{SparkConf, SparkContext}
    object case2 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("reduce")
        val sc = new SparkContext(conf)
        sc.setLogLevel("ERROR")
        //获取数据
        val two = sc.textFile("hdfs://192.168.109.130:8020//user/flume/ClassicCase/case2/*")
        two.filter(_.trim().length>0) //需要有空格。
            .map(line=>(line.trim,""))//全部值当key,(key value,"")
              .groupByKey()//groupByKey,过滤重复的key value ,发送到总机器上汇总
                  .sortByKey() //按key value的自然顺序排序
                      .keys.collect().foreach(println) //所有的keys变成数组再输出
        //第二种有风险
        two.filter(_.trim().length>0)
              .map(line=>(line.trim,"1"))
                .distinct()
                    .reduceByKey(_+_)
                        .sortByKey()
                            .foreach(println)
    
        //reduceByKey,在本机suffle后,再发送一个总map,发送到一个总机器上汇总,(汇总要压力小)
        //groupByKey,发送本机所有的map,在一个机器上汇总(汇总压力大)
        //如果数据在不同的机器上,则会出现先重复数据,distinct,reduceBykey,只是在本机上去重,谨慎一点的话,在reduceByKey后面需要加多一个distinct
    
      }
    }
    
    

    方法二

    object FileDistinct {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("filedistinct").setMaster("local")
        val sc = new SparkContext(sparkConf)
    
        val rdd = sc.textFile("demo1/*")
        //1.filter去掉空行  map去掉空格
        rdd.filter(x => x.trim.length!=0).map(_.trim).distinct().foreach(println)
        //group必须有键值对  拿到key的值
        rdd.filter(x => x.trim.length!=0).map(x =>(x.trim,1)).groupByKey().keys.foreach(println)
        //3.
        rdd.filter(x => x.trim.length!=0).map(x =>(x.trim,1)).reduceByKey(_+_).keys.foreach(println)
      }
    }
    
    

    输出结果
    2012-3-1 a 2012-3-1 b 2012-3-2 a 2012-3-2 b 2012-3-3 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-6 c 2012-3-7 c 2012-3-7 d (2012-3-1 a,1) (2012-3-1 b,1) (2012-3-2 a,1) (2012-3-2 b,1) (2012-3-3 b,1) (2012-3-3 c,1) (2012-3-4 d,1) (2012-3-5 a,1) (2012-3-6 b,1) (2012-3-6 c,1) (2012-3-7 c,1) (2012-3-7 d,1)

    reduceByKey和groupByKey区别与用法

    (1)当采用reduceByKeyt时,Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合。借助下图可以理解在reduceByKey里究竟发生了什么。 注意在数据对被搬移前同一机器上同样的key是怎样被组合的(reduceByKey中的lamdba函数)。然后lamdba函数在每个区上被再次调用来将所有值reduce成一个最终结果。整个过程如下:

    image

    (2)当采用groupByKey时,由于它不接收函数,spark只能先将所有的键值对(key-value pair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时。整个过程如下:

    image

    ( 3 )区别
    reduceByKey,在本机suffle后,再发送一个总map,发送到一个总机器上suffle汇总map,(汇总要压力小)
    groupByKey,发送本机所有的map,在一个机器上suffle汇总map(汇总压力大)

    相关文章

      网友评论

          本文标题:Spark经典案例之数据去重

          本文链接:https://www.haomeiwen.com/subject/liymuqtx.html