美文网首页我爱编程
SparkStreaming之优雅停止

SparkStreaming之优雅停止

作者: 阿坤的博客 | 来源:发表于2018-07-26 13:49 被阅读143次

    本文主要记录使用SparkStreaming从Kafka里读取数据,并使用Redis保存Offset,并监听Redis中的某个Key是否存在来停止程序

    相关文章:
    1.Spark之PI本地
    2.Spark之WordCount集群
    3.SparkStreaming之读取Kafka数据
    4.SparkStreaming之使用redis保存Kafka的Offset
    5.SparkStreaming之优雅停止
    6.SparkStreaming之写数据到Kafka
    7.Spark计算《西虹市首富》短评词云

    1.监听redis的某个key是否存在

    /**
      * 优雅的停止Streaming程序
      *
      * @param ssc
      */
    def stopByMarkKey(ssc: StreamingContext): Unit = {
      val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在
        var isStop = false
        while (!isStop) {
          isStop = ssc.awaitTerminationOrTimeout(intervalMills)
            if (!isStop && isExists(STOP_FLAG)) {
              LOG.warn("2秒后开始关闭sparstreaming程序.....")
                Thread.sleep(2000)
                ssc.stop(true, true)
            }
        }
    }
    
    /**
        * 判断Key是否存在
        *
        * @param key
        * @return
        */
    def isExists(key: String): Boolean = {
      val jedis = InternalRedisClient.getPool.getResource
        val flag = jedis.exists(key)
        jedis.close()
        flag
    }
    

    2.KafkaRedisStreaming

    package me.jinkun.stream
    
    import me.jinkun.scala.ETLStreaming.LOG
    import me.jinkun.scala.util.InternalRedisClient
    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext, TaskContext}
    import org.slf4j.LoggerFactory
    
    /**
      *
      */
    object KafkaRedisStreaming {
      private val LOG = LoggerFactory.getLogger("KafkaRedisStreaming")
    
      private val STOP_FLAG = "TEST_STOP_FLAG"
    
      def initRedisPool() = {
        // Redis configurations
        val maxTotal = 20
        val maxIdle = 10
        val minIdle = 1
        val redisHost = "47.98.119.122"
        val redisPort = 6379
        val redisTimeout = 30000
        InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
      }
    
      /**
        * 从redis里获取Topic的offset值
        *
        * @param topicName
        * @param partitions
        * @return
        */
      def getLastCommittedOffsets(topicName: String, partitions: Int): Map[TopicPartition, Long] = {
        if (LOG.isInfoEnabled())
          LOG.info("||--Topic:{},getLastCommittedOffsets from Redis--||", topicName)
    
        //从Redis获取上一次存的Offset
        val jedis = InternalRedisClient.getPool.getResource
        val fromOffsets = collection.mutable.HashMap.empty[TopicPartition, Long]
        for (partition <- 0 to partitions - 1) {
          val topic_partition_key = topicName + "_" + partition
          val lastSavedOffset = jedis.get(topic_partition_key)
          val lastOffset = if (lastSavedOffset == null) 0L else lastSavedOffset.toLong
          fromOffsets += (new TopicPartition(topicName, partition) -> lastOffset)
        }
        jedis.close()
    
        fromOffsets.toMap
      }
    
      def main(args: Array[String]): Unit = {
        //初始化Redis Pool
        initRedisPool()
    
        val conf = new SparkConf()
          .setAppName("ScalaKafkaStream")
          .setMaster("local[2]")
    
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        val ssc = new StreamingContext(sc, Seconds(3))
    
        val bootstrapServers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
        val groupId = "kafka-test-group"
        val topicName = "Test"
        val maxPoll = 20000
    
        val kafkaParams = Map(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
          ConsumerConfig.GROUP_ID_CONFIG -> groupId,
          ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
        )
    
        // 这里指定Topic的Partition的总数
        val fromOffsets = getLastCommittedOffsets(topicName, 3)
    
        // 初始化KafkaDS
        val kafkaTopicDS =
          KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets))
    
        kafkaTopicDS.foreachRDD(rdd => {
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
          // 如果rdd有数据
          if (!rdd.isEmpty()) {
            val jedis = InternalRedisClient.getPool.getResource
            val p = jedis.pipelined()
            p.multi() //开启事务
    
            // 处理数据
            rdd
              .map(_.value)
              .flatMap(_.split(" "))
              .map(x => (x, 1L))
              .reduceByKey(_ + _)
              .sortBy(_._2, false)
              .foreach(println)
    
            //更新Offset
            offsetRanges.foreach { offsetRange =>
              println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
              val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition
              p.set(topic_partition_key, offsetRange.untilOffset + "")
            }
    
            p.exec() //提交事务
            p.sync //关闭pipeline
            jedis.close()
          }
        })
    
        ssc.start()
    
        // 优雅停止
        stopByMarkKey(ssc)
    
        ssc.awaitTermination()
      }
    
      /**
        * 优雅停止
        *
        * @param ssc
        */
      def stopByMarkKey(ssc: StreamingContext): Unit = {
        val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在
        var isStop = false
        while (!isStop) {
          isStop = ssc.awaitTerminationOrTimeout(intervalMills)
          if (!isStop && isExists(STOP_FLAG)) {
            LOG.warn("2秒后开始关闭sparstreaming程序.....")
            Thread.sleep(2000)
            ssc.stop(true, true)
          }
        }
      }
    
      /**
        * 判断Key是否存在
        *
        * @param key
        * @return
        */
      def isExists(key: String): Boolean = {
        val jedis = InternalRedisClient.getPool.getResource
        val flag = jedis.exists(key)
        jedis.close()
        flag
      }
    }
    

    这样只需要在Redis里创建TEST_STOP_FLAG即可

    set "TEST_STOP_FLAG" 1
    

    运行结果:


    这样在停止SparkStreaming程序是就不会造成数据丢失了。

    相关文章

      网友评论

        本文标题:SparkStreaming之优雅停止

        本文链接:https://www.haomeiwen.com/subject/mirtpftx.html