美文网首页Hbase 实战大数据亿级流量分析大数据,机器学习,人工智能
Spark + Hbase 百亿级流量实时分析统计 之 经典指标

Spark + Hbase 百亿级流量实时分析统计 之 经典指标

作者: 大猪大猪 | 来源:发表于2019-03-28 00:21 被阅读0次

    作为一个百亿级的流量实时分析统计系统怎么能没有PV/UV这两经典的超级玛丽亚指标呢,话说五百年前它俩可以鼻祖,咳咳...,不好意思没忍住,多嘴,回归正文,大猪 在上一篇已经介绍了 小巧高性能ETL程序设计与实现 了,到现在,我们的数据已经落地到Hbase上了,而且日志的时间也已经写到Mysql了,万事都已经具备了,接下来我们就要撸指标了,先从两个经典的指标开始。

    程序流程

    在运行指标之前我们有必要先理一下整个程序的计算流程,如下图:


    1. 开始计算是我们的Driver程序入口

    2. 开始计算之前检查监听Redis有没有收到程序退出通知,如果有程序结束,否则往下执行

    3. 首先去查询我们上篇文章的 ETL loghub 日志的进度的平均时间点

    4. Switch 处是判断 loghub 的时间距离我们上次计算的指标时间是否相差足够时间,一般定义为3分钟时间之后,因为 loghub 的时间会有少量的波动情况

    5. 不满足则 Sleep 30秒,可以自己控制Sleep范围。

    6. 满足则计算 上次指标计算结束时间 ~ (loghub时间 - 3分钟日志波动)

    7. 计算完成更新指标结果并且更新指标计算时间,然后回到第2点。

    程序实现

    先从入口开始看起

    //监听redis退出消息
    while (appRunning) {
          val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
          //日志offset
          val loghubTime = dbClient.query("loghub").toLocalDateTime.minusMinutes(3)
          //指标计算offset
          val indicatorTime =dbClient.query("indicator").toLocalDateTime
          //两个时间相差(分)
          val betweenTimeMinutes = Duration.between(indicatorTime, loghubTime).toMinutes
    
          val format = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
          //相差足够时间则进行指标运行,否则睡眠
          if (betweenTimeMinutes >= 1) {
            app.run(spark, indicatorTime, loghubTime)
            //计算完成更新指标时间
            dbClient.upsert(Map("offsetName" -> "indicator"), Update(sets = Map("time" -> loghubTime.toString)), "offset")
          } else {
            TimeUnit.SECONDS.sleep(30)
          }
        }
    

    从注释上看,整体思路还是比较清晰的,接下来我们看一下run里面的方法做了什么有意思的操作

    conf.set(TableInputFormat.INPUT_TABLE, Tables.LOG_TABLE)
    conf.set("TableInputFormat.SCAN_ROW_START", start)
    conf.set("TableInputFormat.SCAN_ROW_START", end)
    val logDS = sc.newAPIHadoopRDD(
          conf,
          classOf[TableInputFormat2],
          classOf[ImmutableBytesWritable],
          classOf[Result]
        )
          .map(tp2 => HbaseUtil.resultToMap(tp2._2))
          .map(map => {
            LogCase(
              dt = DT(
                map.get("time").toLocalDateTimeStr(),
                map.get("time").toLocalDate().toString
              ),
              `type` = map.get("type"),
              aid = map.get("aid"),
              uid = map.get("uid"),
              tid = map.get("tid"),
              ip = map.get("ip")
            )
          }).toDS()
    
        logDS.cache()
        logDS.createTempView("log")
        //各类指标
        new PV().run()
        new UV().run()
    

    start跟end就是上面传下来的时间

    就是把我们的Hbase的分片数据转成了Spark的Dataset了,然后再创建Spark的一张log表。

    在UV跟PV就可以使用这张log表了,我们看看这两个经典的指标里面到底有什么乾坤

    跟着 大猪 先来看看PV指标

    spark.sql(
          """
            |SELECT
            |    aid,
            |    dt.date,
            |    COUNT(1) as pv
            |FROM
            |    log
            |GROUP BY
            |    aid,
            |    dt.date
          """.stripMargin)
          .rdd
          .foreachPartition(rows => {
            val props = PropsUtils.properties("db")
            val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
            rows.foreach(row => {
              dbClient.upsert(
                Map(
                  "time" -> row.getAs[String]("date"),
                  "aid" -> row.getAs[String]("aid")
                ),
                Update(incs = Map("pv" -> row.getAs[Long]("pv").toString)),
                "common_report"
              )
            })
            dbClient.close()
          })
    

    哇然一看,大哥你这也写得太简单了吧,你当我是三岁小朋友咩

    就是一个普通的PV算法,再加上分区foreach操作,再把更到的每一行数据upsert到我们的common_report指标表嘛

    从这个方法我就能推算出common_report长什么样了,至少有time+aid这两个唯一索引字符,还有pv这个字符,默认值肯定是0。

    百闻不如一见,看看是不是这样子:

    create table common_report
    (
        id bigint auto_increment primary key,
        aid bigint not null,
        pv int default 0 null,
        uv int default 0 null,
        time date not null,
        constraint common_report_aid_time_uindex unique (aid, time)
    );
    

    果然一点都没错。

    dbClient.upsert里面大概也能猜到是实现了mysql的upsert功能,大概的sql就会生成:

    INSERT INTO common_report (time, aid, pv)
    VALUES ('2019-03-26', '10000', 1) ON DUPLICATE KEY UPDATE pv = pv + 1;
    

    大猪 那UV是怎么实现咧?一个用户在今天来过第一次之后再来就不能再算了噢。
    大猪答:这个简单简单,可以使用Redis去重嘛,但是我们使用的都是Hbase了,还使用它做蛤子,来来来,我们一起来看UV的实现的小技巧:

    val logDS = spark.table("log").as(ExpressionEncoder[LogCase])
        import spark.implicits._
        logDS
          .mapPartitions(partitionT => {
            val hbaseClient = DBHbaseHelper.getDBHbase(Tables.CACHE_TABLE)
            val md5 = (log: LogCase) => MD5Hash.getMD5AsHex(s"${log.dt.date}|${log.aid}|${log.uid}|uv".getBytes)
            partitionT
              .grouped(Consts.BATCH_MAPPARTITIONS)
              .flatMap { tList =>
                tList
                  .zip(hbaseClient.incrments(tList.map(md5)))
                  .map(tp2 => {
                    val log = tp2._1
                    log.copy(ext = EXT(tp2._2))
                  })
              }
          }).createTempView("uvTable")
    
        spark.sql(
          """
            |SELECT
            |    aid,
            |    dt.date,
            |    COUNT(1) as uv
            |FROM
            |    uvTable
            |WHERE
            |    ext.render = 1
            |GROUP BY
            |    aid,
            |    dt.date
          """.stripMargin)
          .rdd
          .foreachPartition(rows => {
            val props = PropsUtils.properties("db")
            val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
            rows.foreach(row => {
              dbClient.upsert(
                Map(
                  "time" -> row.getAs[String]("date"),
                  "aid" -> row.getAs[String]("aid")
                ),
                Update(incs = Map("uv" -> row.getAs[Long]("uv").toString)),
                "common_report"
              )
            })
            dbClient.close()
          })
    

    spark.sql 这里跟PV一样嘛,就是多了一句条件ext.render = 1,但是上面那一大堆是啥子咧?

    莫慌莫慌,大猪 这就慢慢解释道:

    val logDS = spark.table("log").as(ExpressionEncoder[LogCase])
    

    这句的意思就是把我们run类里面创建的log给还原出来,当然啦大家也是可以从参数传递进来的嘛。

    这里的mapPartitions挺有意思的:

    partitionT
        .grouped(1000)
            .flatMap { tList =>
              tList
                .zip(hbaseClient.incrments(tList.map(md5)))
                .map(tp2 => {
                  val log = tp2._1
                  log.copy(ext = EXT(tp2._2))
                })
            }
    

    实际上上面是处理每个分区的数据,也就是转换数据,我们每来一条数据就要去Hbase那incrment一次,返回来的结果就是render,用户今天来了多少次就incrment一次,相应的render的值就会加1。
    那有什么用?我直接从Hbase取出数据,再判断有没有,如果没有这个用户就是今天第一次来,如果有就不是,多简单。其实当初我们也是这么做的,后面发现业务的东西还是放在SQL里面一起写比较好,容易维护,而且incrment好处多多,因为它是带事务的,可以多线程进行修改,结果是一样的。
    而且render = 1的时候是代表UV,如果render = 2的时候又可以代表今天来过两次以上的用户指标,随时扩展,你说咧?

    hbaseClient.incrments
    
    def incrments(incs: Seq[String], family: String = "info", amount: Int = 1): Seq[Long] = {
        if (incs.isEmpty) {
          Seq[Long]()
        } else {
          require(incs.head.length == 32, "pk require 32 length")
          val convertIncs = incs map { pk => new Increment(Bytes.toBytes(pk.take(8))).addColumn(Bytes.toBytes(family), Bytes.toBytes(pk.takeRight(24)), amount) }
          val results = new Array[Object](convertIncs.length)
          table.batch(convertIncs.asJava, results)
          results.array.indices.map(
            ind =>
              Bytes.toLong(
                results(ind)
                  .asInstanceOf[Result]
                  .getValue(
                    Bytes.toBytes(family),
                    Bytes.toBytes(incs(ind).takeRight(24))
                  )
              )
          )
        }
      }
    

    这个方法就是实现了incrment的批量处理,因为我们在线上生产环境的时候测试过,批量处理比单条处理性能高了上百倍,所以这也就是为什么要写在mapPartitions里面的原因了,因为只有在这个方法里面才有批量数据转换操作,foreachPartition是批量处理操作,我们在输出报表到Mysql的地方已经用到了。

    如果我们要关闭程序只需要给redis发一条stop消息就可以啦

    RedisUtil().getResource.publish("computeListenerMessage", "stop")
    

    传送门 完整项目

    写了两个晚上终于写完了。

    相关文章

      网友评论

        本文标题:Spark + Hbase 百亿级流量实时分析统计 之 经典指标

        本文链接:https://www.haomeiwen.com/subject/qvcovqtx.html