美文网首页flume
sparkstreaming将offset存到hbase

sparkstreaming将offset存到hbase

作者: 码仙丶 | 来源:发表于2018-09-08 14:18 被阅读258次

    之前一直将offset存到zookeeper,由于streaming程序会对zk有大量的读取操作,故将offset存到zk不太好,现已将offset都改到hbase中

    • kafka版本:0.10.2.0

    • spark版本:2.0

    • hbase表结构:groupid名字作为表名,topic名字作为rowkey,列族为info,分区号作为列名

    kafka_offset:groupid info:0 info:1 info:2
    rowkey(topicName) 10000 10000 10000
    1. 改完后streaming程序中的代码调用
    //先初始化hbase连接对象
    HbaseUtil.setConf("zk address", "zk port")
    //hbase中存offset的命名空间和表名
    val offsetTbName = "kafka_offset:groupId"
    HbaseUtil.createTable(offsetTbName, "info")//hbase中不存这个表就创建
    //去hbase中获取topic partition范围,hbase中不存在也没关系(第一次用这个groupid的时候)
    val fromOffsets: Map[TopicPartition, Long] = OffsetUtil.getFromOffsets
    /**
    * param offsets :
    * offsets to begin at on initial startup.  If no offset is given for a
    * TopicPartition, the committed offset (if applicable) or kafka param
    * auto.offset.reset will be used.
    * 引用源码的注释,意思大概就是如果第一次获取不到topicPartition就用auto.offset.reset这个配置来决定是从earliest还是latest开始读取kafka数据,也就是说不用担心fromOffset第一次取为空的情况
    */
    //创建kafkaStream
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
        ssc,//StreamingContext
        PreferConsistent,
        Subscribe[String, String](topicsSet, kafkaParams, fromOffsets)
    )
    
    kafkaStream.foreachRDD(rdd => {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        //忽略逻辑代码
        pass...
        //提交offset到hbase
        OffsetUtil.saveOffsetToHbase(offsetRanges, "groupId")
    })
    
    HbaseUtil.close()
    
    • 附上上面用到的HbaseUtil.scala和OffsetUtil.scala
    1. HbaseUtil.scala
    object HbaseUtil {
      var conf: Configuration = _
      //线程池
      lazy val connection: Connection = ConnectionFactory.createConnection(conf)
      lazy val admin: Admin = connection.getAdmin
    
      /**
        * hbase conf
        *
        * @param quorum hbase的zk地址
        * @param port   zk端口2181
        * @return
        */
      def setConf(quorum: String, port: String): Unit = {
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum", quorum)
        conf.set("hbase.zookeeper.property.clientPort", port)
        this.conf = conf
      }
    
      /**
        * 如果不存在就创建表
        * @param tableName 命名空间:表名
        * @param columnFamily 列族
        */
      def createTable(tableName: String, columnFamily: String): Unit = {
        val tbName = TableName.valueOf(tableName)
        if (!admin.tableExists(tbName)) {
          val htableDescriptor = new HTableDescriptor(tbName)
          val hcolumnDescriptor = new HColumnDescriptor(columnFamily)
          htableDescriptor.addFamily(hcolumnDescriptor)
          admin.createTable(htableDescriptor)
        }
      }
    
      /**
        * 获取hbase单元格内容
        * @param tableName 命名空间:表名
        * @param rowKey rowkey
        * @return 返回单元格组成的List
        */
      def getCell(tableName: String, rowKey: String): mutable.Buffer[Cell] = {
        val get = new Get(Bytes.toBytes(rowKey))
        val table = connection.getTable(TableName.valueOf(tableName))
        val result: Result = table.get(get)
        import scala.collection.JavaConverters._
        result.listCells().asScala
      }
    
      /**
        * 单条插入
        * @param tableName 命名空间:表名
        * @param rowKey rowkey
        * @param family 列族
        * @param qualifier column列
        * @param value 列值
        */
      def singlePut(tableName: String, rowKey: String, family: String, qualifier: String, value: String): Unit = {
        //单个插入
        val put: Put = new Put(Bytes.toBytes(rowKey)) //参数是行健
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value))
    
        //获得表对象
        val table: Table = connection.getTable(TableName.valueOf(tableName))
        table.put(put)
        table.close()
      }
    
      def close(): Unit = {
        admin.close()
        connection.close()
      }
    
    1. OffsetUtil.scala
    object OffsetUtil {
      //从hbase中获取offset
      def getFromOffsets: Map[TopicPartition, Long] ={
        var fromOffsets: Map[TopicPartition, Long] = Map()
        AnalysisParam.topicSet.foreach(topic => {
          val get = new Get(Bytes.toBytes(topic))
          val table: Table = HbaseUtil.connection.getTable(TableName.valueOf(s"kafka_offset:groupId"))
          if (table.exists(get)) {
            val cells = HbaseUtil.getCell(s"kafka_offset:groupId", topic)
            cells.foreach(cell => {
              val partition = Bytes.toString(CellUtil.cloneQualifier(cell))
              val offset = Bytes.toString(CellUtil.cloneValue(cell))
              val tp = new TopicPartition(topic, partition.toInt)
              fromOffsets += (tp -> offset.toLong)
            })
          }
        })
        fromOffsets
      }
    
      //将offset存到hbase
      def saveOffsetToHbase(offsetRanges:Array[OffsetRange],groupId:String): Unit ={
        offsetRanges.foreach(o => {
          val topic = o.topic
          val partition = o.partition
          val offset = o.fromOffset
          HbaseUtil.singlePut(s"kafka_offset:$groupId", topic, "info", partition.toString, offset.toString)
        })
      }
    }
    

    相关文章

      网友评论

        本文标题:sparkstreaming将offset存到hbase

        本文链接:https://www.haomeiwen.com/subject/tromwftx.html