美文网首页sparkspark
第十篇SparkStreaming手动维护Kafka Offse

第十篇SparkStreaming手动维护Kafka Offse

作者: 大数据技术与数仓 | 来源:发表于2020-11-12 13:23 被阅读0次

    Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是创建输入流直接从Kafka 集群节点拉取消息。输入流保证每个消息从Kafka 集群拉取以后只完全转换一次,保证语义一致性。但是当作业发生故障或重启时,要保障从当前的消费位点去处理数据(即Exactly Once语义),单纯的依靠SparkStreaming本身的机制是不太理想的,生产环境中通常借助手动管理offset的方式来维护kafka的消费位点。本文分享将介绍如何手动管理Kafka的Offset,希望对你有所帮助。本文主要包括以下内容:

    • 如何使用MySQL管理Kafka的Offset
    • 如何使用Redis管理Kafka的OffSet

    如何使用MySQL管理Kafka的Offset

    我们可以从Spark Streaming 应用程序中编写代码来手动管理Kafka偏移量,偏移量可以从每一批流处理中生成的RDDS偏移量来获取,获取方式为:

    KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
    // 获取偏移量
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
     ...
      }
    

    当获取到偏移量之后,可以将将其保存到外部存储设备中(MySQL、Redis、Zookeeper、HBase等)。

    使用案例代码

    • MySQL中用于保存偏移量的表
    CREATE TABLE `topic_par_group_offset` (
      `topic` varchar(255) NOT NULL,
      `partition` int(11) NOT NULL,
      `groupid` varchar(255) NOT NULL,
      `offset` bigint(20) DEFAULT NULL,
      PRIMARY KEY (`topic`,`partition`,`groupid`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
    
    • 常量配置类:ConfigConstants
    object ConfigConstants {
      // Kafka配置
      val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092"
      val groupId = "group_test"
      val kafkaTopics = "test"
      val batchInterval = Seconds(5)
      val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
      val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer"
      val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer"
      val sparkSerializer = "org.apache.spark.serializer.KryoSerializer"
      val batchSize = 16384
      val lingerMs = 1
      val bufferMemory = 33554432
      // MySQL配置
      val user = "root"
      val password = "123qwe"
      val url = "jdbc:mysql://localhost:3306/kafka_offset"
      val driver = "com.mysql.jdbc.Driver"
      // 检查点配置
      val checkpointDir = "file:///e:/checkpoint"
      val checkpointInterval = Seconds(10)
      // Redis配置
      val redisAddress = "192.168.10.203"
      val redisPort = 6379
      val redisAuth = "123qwe"
      val redisTimeout = 3000
    }
    
    • JDBC连接工具类:JDBCConnPool
    object JDBCConnPool {
    
      val log: Logger = Logger.getLogger(JDBCConnPool.getClass)
      var dataSource: BasicDataSource = null
      /**
        * 创建数据源
        *
        * @return
        */
      def getDataSource(): BasicDataSource = {
        if (dataSource == null) {
          dataSource = new BasicDataSource()
          dataSource.setDriverClassName(ConfigConstants.driver)
          dataSource.setUrl(ConfigConstants.url)
          dataSource.setUsername(ConfigConstants.user)
          dataSource.setPassword(ConfigConstants.password)
          dataSource.setMaxTotal(50)
          dataSource.setInitialSize(3)
          dataSource.setMinIdle(3)
          dataSource.setMaxIdle(10)
          dataSource.setMaxWaitMillis(2 * 10000)
          dataSource.setRemoveAbandonedTimeout(180)
          dataSource.setRemoveAbandonedOnBorrow(true)
          dataSource.setRemoveAbandonedOnMaintenance(true)
          dataSource.setTestOnReturn(true)
          dataSource.setTestOnBorrow(true)
        }
        return dataSource
      }
      /**
        * 释放数据源
        */
      def closeDataSource() = {
        if (dataSource != null) {
          dataSource.close()
        }
      }
      /**
        * 获取数据库连接
        *
        * @return
        */
      def getConnection(): Connection = {
        var conn: Connection = null
        try {
          if (dataSource != null) {
            conn = dataSource.getConnection()
          } else {
            conn = getDataSource().getConnection()
          }
        } catch {
          case e: Exception =>
            log.error(e.getMessage(), e)
        }
        conn
      }
    
      /**
        * 关闭连接
        */
     def closeConnection (ps:PreparedStatement , conn:Connection ) {
        if (ps != null) {
          try {
            ps.close();
          } catch  {
            case e:Exception =>
              log.error("预编译SQL语句对象PreparedStatement关闭异常!" + e.getMessage(), e);
          }
        }
        if (conn != null) {
          try {
            conn.close();
          } catch  {
            case e:Exception =>
            log.error("关闭连接对象Connection异常!" + e.getMessage(), e);
          }
        }
      }
    }
    
    • Kafka生产者:KafkaProducerTest
    object KafkaProducerTest {
      def main(args: Array[String]): Unit = {
        val  props : Properties = new Properties()
        props.put("bootstrap.servers", ConfigConstants.kafkaBrokers)
        props.put("batch.size", ConfigConstants.batchSize.asInstanceOf[Integer])
        props.put("linger.ms", ConfigConstants.lingerMs.asInstanceOf[Integer])
        props.put("buffer.memory", ConfigConstants.bufferMemory.asInstanceOf[Integer])
        props.put("key.serializer",ConfigConstants.kafkaKeySer)
        props.put("value.serializer", ConfigConstants.kafkaValueSer)
       val  producer :  Producer[String, String] = new KafkaProducer[String, String](props)
        val startTime : Long  = System.currentTimeMillis()
        for ( i <- 1 to 100) {
          producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, "Spark", Integer.toString(i)))
        }
      println("消耗时间:" + (System.currentTimeMillis() - startTime))
        producer.close()
      }
    }
    
    • 读取和保存Offset:

    该对象的作用是从外部设备中读取和写入Offset,包括MySQL和Redis

    object OffsetReadAndSave {
    
      /**
        * 从MySQL中获取偏移量
        *
        * @param groupid
        * @param topic
        * @return
        */
    
      def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = {
    
        val conn = JDBCConnPool.getConnection()
        val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?"
        val ppst = conn.prepareStatement(selectSql)
        ppst.setString(1, groupid)
        ppst.setString(2, topic)
    
        val result: ResultSet = ppst.executeQuery()
    
        // 主题分区偏移量
        val topicPartitionOffset = mutable.Map[TopicPartition, Long]()
    
        while (result.next()) {
    
          val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition"))
    
          topicPartitionOffset += (topicPartition -> result.getLong("offset"))
        }
    
        JDBCConnPool.closeConnection(ppst, conn)
        topicPartitionOffset
      }
    
      /**
        * 从Redis中获取偏移量
        *
        * @param groupid
        * @param topic
        * @return
        */
      def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] = {
        val jedis: Jedis = JedisConnPool.getConnection()
        var offsets = mutable.Map[TopicPartition, Long]()
    
        val key = s"${topic}_${groupid}"
        val fields : java.util.Map[String, String] = jedis.hgetAll(key)
        for (partition <- JavaConversions.mapAsScalaMap(fields)) {
    
          offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)
        }
    
        offsets.toMap
    
      }
      /**
        * 将偏移量写入MySQL
        *
        * @param groupid     消费者组ID
        * @param offsetRange 消息偏移量范围
        */
    
      def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
    
        val conn = JDBCConnPool.getConnection()
        val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)"
        val ppst = conn.prepareStatement(insertSql)
    
        for (offset <- offsetRange) {
    
          ppst.setString(1, offset.topic)
          ppst.setInt(2, offset.partition)
          ppst.setString(3, groupid)
          ppst.setLong(4, offset.untilOffset)
          ppst.executeUpdate()
        }
        JDBCConnPool.closeConnection(ppst, conn)
    
      }
      /**
        * 将偏移量保存到Redis中
        * @param groupid
        * @param offsetRange
        */
      def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = {
        val jedis :Jedis = JedisConnPool.getConnection()
        for(offsetRange<-offsetRange){
          val topic=offsetRange.topic
          val partition=offsetRange.partition
          val offset=offsetRange.untilOffset
          // key为topic_groupid,field为partition,value为offset
          jedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString)
        }
      }
    }
    
    
    • 业务处理类

    该对象是业务处理逻辑,主要是消费Kafka数据,再处理之后进行手动将偏移量保存到MySQL中。在启动程序时,会判断外部存储设备中是否存在偏移量,如果是首次启动则从最初的消费位点消费,如果存在Offset,则从当前的Offset去消费。

    观察现象:当首次启动时会从头消费数据,手动停止程序,然后再次启动,会发现会从当前提交的偏移量消费数据。

    object ManualCommitOffset {
      
      def main(args: Array[String]): Unit = {
    
        val brokers = ConfigConstants.kafkaBrokers
        val groupId = ConfigConstants.groupId
        val topics = ConfigConstants.kafkaTopics
        val batchInterval = ConfigConstants.batchInterval
    
        val conf = new SparkConf()
          .setAppName(ManualCommitOffset.getClass.getSimpleName)
          .setMaster("local[1]")
          .set("spark.serializer",ConfigConstants.sparkSerializer)
    
        val ssc = new StreamingContext(conf, batchInterval)
        // 必须开启checkpoint,否则会报错
        ssc.checkpoint(ConfigConstants.checkpointDir)
    
        ssc.sparkContext.setLogLevel("OFF")
        //使用broker和topic创建direct kafka stream
        val topicSet = topics.split(" ").toSet
    
        // kafka连接参数
        val kafkaParams = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
          ConsumerConfig.GROUP_ID_CONFIG -> groupId,
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
          ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
        )
    
    
        // 从MySQL中读取该主题对应的消费者组的分区偏移量
        val offsetMap = OffsetReadAndSave.getOffsetMap(groupId, topics)
        var inputDStream: InputDStream[ConsumerRecord[String, String]] = null
    
        //如果MySQL中已经存在了偏移量,则应该从该偏移量处开始消费
        if (offsetMap.size > 0) {
          println("存在偏移量,从该偏移量处进行消费!!")
    
          inputDStream = KafkaUtils.createDirectStream[String, String](
            ssc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))
    
        } else {
          //如果MySQL中没有存在了偏移量,从最早开始消费
          inputDStream = KafkaUtils.createDirectStream[String, String](
            ssc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
    
        }
        // checkpoint时间间隔,必须是batchInterval的整数倍
        inputDStream.checkpoint(ConfigConstants.checkpointInterval)
    
        // 保存batch的offset
        var offsetRanges = Array[OffsetRange]()
        // 获取当前DS的消息偏移量
        val transformDS = inputDStream.transform { rdd =>
          // 获取offset
          offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd
        }
    
    
        /**
          * 状态更新函数
          * @param newValues:新的value值
          * @param stateValue:状态值
          * @return
          */
        def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
          var oldvalue = stateValue.getOrElse(0) // 获取状态值
          // 遍历当前数据,并更新状态
          for (newValue <- newValues) {
            oldvalue += newValue
          }
          // 返回最新的状态
          Option(oldvalue)
        }
        // 业务逻辑处理
        // 该示例统计消息key的个数,用于查看是否是从已经提交的偏移量消费数据
        transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()
    
        // 打印偏移量和数据信息,观察输出的结果
        transformDS.foreachRDD { (rdd, time) =>
          // 遍历打印该RDD数据
          rdd.foreach { record =>
            println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
          }
          // 打印消费偏移量信息
          for (o <- offsetRanges) {
            println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")
    
          }
    
          //将偏移量保存到到MySQL中
          OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges)
        }
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    如何使用Redis管理Kafka的OffSet

    • Redis连接类
    object JedisConnPool {
      val config = new JedisPoolConfig
      //最大连接数
      config.setMaxTotal(60)
      //最大空闲连接数
      config.setMaxIdle(10)
      config.setTestOnBorrow(true)
    
      //服务器ip
      val redisAddress :String = ConfigConstants.redisAddress.toString
      // 端口号
      val redisPort:Int = ConfigConstants.redisPort.toInt
      //访问密码
      val redisAuth :String = ConfigConstants.redisAuth.toString
      //等待可用连接的最大时间
      val redisTimeout:Int = ConfigConstants.redisTimeout.toInt
    
      val pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth)
    
      def getConnection():Jedis = {
        pool.getResource
      }
    
    }
    
    • 业务逻辑处理

    该对象与上面的基本类似,只不过使用的是Redis来进行存储Offset,存储到Redis的数据类型是Hash,基本格式为:[key field value] -> [ topic_groupid partition offset],即 key为topic_groupid,field为partition,value为offset。

    object ManualCommitOffsetToRedis {
    
      def main(args: Array[String]): Unit = {
    
        val brokers = ConfigConstants.kafkaBrokers
        val groupId = ConfigConstants.groupId
        val topics = ConfigConstants.kafkaTopics
        val batchInterval = ConfigConstants.batchInterval
    
        val conf = new SparkConf()
          .setAppName(ManualCommitOffset.getClass.getSimpleName)
          .setMaster("local[1]")
          .set("spark.serializer", ConfigConstants.sparkSerializer)
    
    
        val ssc = new StreamingContext(conf, batchInterval)
        // 必须开启checkpoint,否则会报错
        ssc.checkpoint(ConfigConstants.checkpointDir)
    
        ssc.sparkContext.setLogLevel("OFF")
        //使用broker和topic创建direct kafka stream
        val topicSet = topics.split(" ").toSet
    
        // kafka连接参数
        val kafkaParams = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
          ConsumerConfig.GROUP_ID_CONFIG -> groupId,
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
          ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
        )
    
    
        // 从Redis中读取该主题对应的消费者组的分区偏移量
        val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics)
        var inputDStream: InputDStream[ConsumerRecord[String, String]] = null
    
        //如果Redis中已经存在了偏移量,则应该从该偏移量处开始消费
        if (offsetMap.size > 0) {
          println("存在偏移量,从该偏移量处进行消费!!")
    
          inputDStream = KafkaUtils.createDirectStream[String, String](
            ssc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))
    
        } else {
          //如果Redis中没有存在了偏移量,从最早开始消费
          inputDStream = KafkaUtils.createDirectStream[String, String](
            ssc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
    
        }
        // checkpoint时间间隔,必须是batchInterval的整数倍
        inputDStream.checkpoint(ConfigConstants.checkpointInterval)
    
        // 保存batch的offset
        var offsetRanges = Array[OffsetRange]()
        // 获取当前DS的消息偏移量
        val transformDS = inputDStream.transform { rdd =>
          // 获取offset
          offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd
        }
    
    
        /**
          * 状态更新函数
          *
          * @param newValues  :新的value值
          * @param stateValue :状态值
          * @return
          */
        def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
          var oldvalue = stateValue.getOrElse(0) // 获取状态值
          // 遍历当前数据,并更新状态
          for (newValue <- newValues) {
            oldvalue += newValue
          }
          // 返回最新的状态
          Option(oldvalue)
        }
        // 业务逻辑处理
        // 该示例统计消息key的个数,用于查看是否是从已经提交的偏移量消费数据
        transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()
    
        // 打印偏移量和数据信息,观察输出的结果
        transformDS.foreachRDD { (rdd, time) =>
          // 遍历打印该RDD数据
          rdd.foreach { record =>
            println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
          }
          // 打印消费偏移量信息
          for (o <- offsetRanges) {
            println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")
    
          }
    
          //将偏移量保存到到Redis中
          OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)
        }
        ssc.start()
        ssc.awaitTermination()
      }
    
    }
    

    总结

    本文介绍了如何使用外部存储设备来保存Kafka的消费位点,通过详细的代码示例说明了使用MySQL和Redis管理消费位点的方式。当然,外部存储设备很多,用户也可以使用其他的存储设备进行管理Offset,比如Zookeeper和HBase等,其基本处理思路都十分相似。

    大数据技术与数仓

    相关文章

      网友评论

        本文标题:第十篇SparkStreaming手动维护Kafka Offse

        本文链接:https://www.haomeiwen.com/subject/txfpbktx.html