美文网首页玩转大数据大数据大数据,机器学习,人工智能
Spark + Hbase 百亿级流量实时分析统计 之 小巧高性

Spark + Hbase 百亿级流量实时分析统计 之 小巧高性

作者: 大猪大猪 | 来源:发表于2019-03-25 23:32 被阅读11次

    在上一篇文章 大猪 已经介绍了日志存储设计方案 ,我们数据已经落地到数据中心上了,那接下来如何ETL呢?毕竟可是生产环境级别的,可不能乱来。其实只要解决几个问题即可,不必要引入很大级别的组件来做,当然了各有各的千秋,本文主要从 易懂小巧简洁高性能 这三个方面去设计出发点,顺便还实现了一个精巧的 Filebeat

    设计

    loghub功能
    要实现的功能就是扫描每天的增量日志并写入Hbase中

    需要攻克如下几个小难题

    1. 需要把文件中的每一行数据都取出来
    2. 能处理超过10G以上的大日志文件,并且只能占用机器一定的内存,越小越好
    3. 从上图可以看到 标黄 的是已经写入Hbase的数据,不能重复读取
    4. 非活跃文件不能扫,因为文件过多会影响整体读取IO性能
    5. 读取中的过程要保证增量数据不能录入,因为要保证offset的时候写入mysql稳定不跳跃

    实现

    大猪 根据线上的生产环境一一把上面的功能重新分析给实现一下。

    从第一点看还是比较简单的嘛?但是我们要结合上面的 5 个问题来看才行。

    总结一句话就是:要实现一个高性能而且能随时重启继续工作的 loghub ETL 程序

    实际也必需这样做,因为生产环境容不得马虎,不然就等着被BOSS

    实现过程

    需要有一个读取所有日志文件方法

    def files(file: File, filter: File => Boolean): Seq[File] = {
      val listFiles = file.listFiles()
      listFiles.filter(filter) ++ listFiles.filter(_.isDirectory).flatMap(f => files(f, filter))
    }
    

    还要实现一个保存并读取文件进度的方法

    //读取索引文件
    def seeks(): util.Map[String, Long] = {
        val seekStr = Source.fromFile(seekFile)
          .getLines.mkString
        if (seekStr.nonEmpty && seekStr != "") {
          read[Map[String, Long]](seekStr)
            .asJava
        } else new util.HashMap[String, Long]()
      }
    //读取单个文件索引
    def readSeek(filePath: String): Long = {
        val fileMd5 = MD5Hash.getMD5AsHex(filePath.getBytes)
        val list = seeks().asScala.filter(_._1.equals(fileMd5))
        if (list.isEmpty) {
          writeSeek(Map(filePath -> 0L).asJava)
          0L
        } else list.head._2.toLong
      }
    //把文件索引更新到索引文件
    def writeSeek(filePaths: util.Map[String, Long]): Unit = synchronized {
        val writer = new PrintWriter(seekFile)
        val convertList = filePaths.asScala.map(x => MD5Hash.getMD5AsHex(x._1.getBytes) -> x._2)
        val sets = offsets.asScala ++ convertList
        val seeksStr = write(sets)
        writer.write(seeksStr)
        writer.flush()
        writer.close()
        offsets.putAll(convertList.asJava)
      }
    

    由于不能把一个日志文件全部读入内存进行处理
    所以还需要一个能根据索引一行一行接着读取数据的方法

    def lines(file: File, startSeek: Long, endSeek: Long, finish: () => Unit): Iterator[String] = {
        new Iterator[String] {
          //使用java的随机文件读写类,性能非常高
          var rfile = new RandomAccessFile(file, "r")
          //设置上次读取的索引结束位置
          rfile.seek(startSeek)
          var nextLine: String = _
          var readSeek: Long = 0
          def appendSeek(): Unit = {
            if (nextLine != null) {
              nextLine = new String(nextLine.getBytes("ISO-8859-1"), "utf-8")
              readSeek += nextLine.getBytes.length
            }
          }
          override def hasNext: Boolean = {
            if (rfile == null) return false
            nextLine = rfile.readLine()
            appendSeek()
            val hl = nextLine != null && readSeek <= (endSeek - startSeek)
            if (!hl) {
              rfile.close()
              rfile = null
              finish()
            }
            hl
          }
          override def next(): String = {
            readSeek += 1 //append '\n' byte length
            nextLine
          }
        }
      }
    

    还有一个Hbase的连接池小工具

    object HbasePool {
      println("connecting.")
      private val connection: Connection = ConnectionFactory.createConnection(ConfUtil.createConf)
      println("connected.")
      //初始化好10个连接等待使用
      private val poolSize = 10
      private val pools = new util.HashMap[Int, BufferedMutator]()
    
      (0 until poolSize).foreach(
        x => {
          println("table connecting.")
          val params = new BufferedMutatorParams(TableName.valueOf("test_arc"))
          //3秒的提交数据间隔,如果程序很快请把这个值改小一点
          params.setWriteBufferPeriodicFlushTimeoutMs(TimeUnit.SECONDS.toMillis(3))
          params.setWriteBufferPeriodicFlushTimerTickMs(100)
          params.maxKeyValueSize(10485760)
          //客户端2M提交缓存
          params.writeBufferSize(1024 * 1024 * 2)
          val table = connection.getBufferedMutator(params)
    
          pools.put(x, table)
          println("table connected.")
        }
      )
    
      def getTable: BufferedMutator = {
        pools.get(Random.nextInt(poolSize))
      }
    }
    

    几个核心方法已经写完了,接着是我们的主程序

    def run(logPath: File, defaultOffsetDay: String): Unit = {
        val sdfstr = Source.fromFile(seekDayFile).getLines().mkString
        val offsetDay = Option(if (sdfstr == "") null else sdfstr)
        
        //读取设置读取日期的倒数一天之后的日期文件夹
        val noneOffsetFold = logPath
          .listFiles()
          .filter(_.getName >= LocalDate.parse(offsetDay.getOrElse(defaultOffsetDay)).minusDays(1).toString)
          .sortBy(f => LocalDate.parse(f.getName).toEpochDay)
    
        //读取文件夹中的所有日志文件,并取出索引进行匹配
        val filesPar = noneOffsetFold
          .flatMap(files(_, file => file.getName.endsWith(".log")))
          .map(file => (file, seeks().getOrDefault(MD5Hash.getMD5AsHex(file.getAbsolutePath.getBytes()), 0), file.length()))
          .filter(tp2 => {
            //过滤出新文件,与有增量的日志文件
            val fileMd5 = MD5Hash.getMD5AsHex(tp2._1.getAbsolutePath.getBytes())
            val result = offsets.asScala.filter(m => fileMd5.equals(m._1))
            result.isEmpty || tp2._3 > result.head._2
          })
          .par
    
        filesPar.tasksupport = pool
    
        val willUpdateOffset = new util.HashMap[String, Long]()
        val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
        var logTime:String = null
        filesPar
          .foreach(tp3 => {
            val hbaseClient = HbasePool.getTable
            //因为不能全量读取数据,所有只能一条一条读取,批量提出交给HbaseClient的客户端的mutate方式优雅处理
            //foreach 里面的部分就是我们的业务处理部分
            lines(tp3._1, tp3._2, tp3._3, () => {
              willUpdateOffset.put(tp3._1.getAbsolutePath, tp3._3)
              offsets.put(MD5Hash.getMD5AsHex(tp3._1.getAbsolutePath.getBytes), tp3._3)
            })
              .foreach(line => {
                val jsonObject = parse(line)
                val time = (jsonObject \ "time").extract[Long]
                val data = jsonObject \ "data"
                val dataMap = data.values.asInstanceOf[Map[String, Any]]
                  .filter(_._2 != null)
                  .map(x => x._1 -> x._2.toString)
    
                val uid = dataMap("uid")
                logTime = time.getLocalDateTime.toString
                val rowkey = uid.take(2) + "|" + time.getLocalDateTime.format(formatter) + "|" + uid.substring(2, 8)
    
                val row = new Put(Bytes.toBytes(rowkey))
                dataMap.foreach(tp2 => row.addColumn(Bytes.toBytes("info"), Bytes.toBytes(tp2._1), Bytes.toBytes(tp2._2)))
                hbaseClient.mutate(row)
              })
            hbaseClient.flush()
          })
        //更新索引到文件上
        writeSeek(willUpdateOffset)
        //更新索引日期到文件上
        writeSeekDay(noneOffsetFold.last.getName)
        //把 logTime offset 写到mysql中,方便Spark+Hbase程序读取并计算
      }
    

    程序很精简,没有任何没用的功能在里面,线上的生产环境就应该是这子的了。
    大家还可以根据需求加入程序退出发邮件通知功能之类的。
    真正去算了一下也就100行功能代码,而且占用极小的内存,都不到100M,很精很精。

    传送门 完整ETL程序源码

    相关文章

      网友评论

        本文标题:Spark + Hbase 百亿级流量实时分析统计 之 小巧高性

        本文链接:https://www.haomeiwen.com/subject/bpwbvqtx.html