美文网首页我爱编程
SparkStreaming之写数据到Kafka

SparkStreaming之写数据到Kafka

作者: 阿坤的博客 | 来源:发表于2018-07-26 13:51 被阅读989次

    本文主要记录使用SparkStreaming从Kafka里读取数据,并使用Redis保存Offset,并监听Redis中的某个Key是否存在来停止程序,将读取到的数据转换为json写入到Kafka

    相关文章:
    1.Spark之PI本地
    2.Spark之WordCount集群
    3.SparkStreaming之读取Kafka数据
    4.SparkStreaming之使用redis保存Kafka的Offset
    5.SparkStreaming之优雅停止
    6.SparkStreaming之写数据到Kafka
    7.Spark计算《西虹市首富》短评词云

    KafkaSink

    对KafkaProducer进行封装便于广播

    import java.util.concurrent.Future
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
    
    class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
      /* This is the key idea that allows us to work around running into
         NotSerializableExceptions. */
      lazy val producer = createProducer()
    
      def send(topic: String, key: K, value: V): Future[RecordMetadata] = {
        producer.send(new ProducerRecord[K, V](topic, key, value))
      }
    
      def send(topic: String, value: V): Future[RecordMetadata] = {
        producer.send(new ProducerRecord[K, V](topic, value))
      }
    }
    
    object KafkaSink {
    
      import scala.collection.JavaConversions._
    
      def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
        val createProducerFunc = () => {
          val producer = new KafkaProducer[K, V](config)
          sys.addShutdownHook {
            // Ensure that, on executor JVM shutdown, the Kafka producer sends
            // any buffered messages to Kafka before shutting down.
            producer.close()
          }
          producer
        }
        new KafkaSink(createProducerFunc)
      }
    
      def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
    }
    

    初始化KafkaSink,并广播

    // 初始化KafkaSink,并广播
    val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
          p.setProperty("bootstrap.servers", bootstrapServers)
          p.setProperty("key.serializer", classOf[StringSerializer].getName)
          p.setProperty("value.serializer", classOf[StringSerializer].getName)
          p
      }
      if (LOG.isInfoEnabled)
        LOG.info("kafka producer init done!")
        ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
    }
    

    变量Partition并使用广播变量发送到Kafka

    // 使用广播变量发送到Kafka
    partition.foreach(record => {
      kafkaProducer.value.send("Test_Json", new Gson().toJson(record))
    })
    

    完整程序 Kafka2KafkaStreaming

    import com.google.gson.Gson
    import me.jinkun.scala.util.{InternalRedisClient, KafkaSink}
    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext, TaskContext}
    import org.slf4j.LoggerFactory
    
    /**
      *
      */
    object Kafka2KafkaStreaming {
      private val LOG = LoggerFactory.getLogger("Kafka2KafkaStreaming")
    
      private val STOP_FLAG = "TEST_STOP_FLAG"
    
      def initRedisPool() = {
        // Redis configurations
        val maxTotal = 20
        val maxIdle = 10
        val minIdle = 1
        val redisHost = "47.98.119.122"
        val redisPort = 6379
        val redisTimeout = 30000
        InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
      }
    
      /**
        * 从redis里获取Topic的offset值
        *
        * @param topicName
        * @param partitions
        * @return
        */
      def getLastCommittedOffsets(topicName: String, partitions: Int): Map[TopicPartition, Long] = {
        if (LOG.isInfoEnabled())
          LOG.info("||--Topic:{},getLastCommittedOffsets from Redis--||", topicName)
    
        //从Redis获取上一次存的Offset
        val jedis = InternalRedisClient.getPool.getResource
        val fromOffsets = collection.mutable.HashMap.empty[TopicPartition, Long]
        for (partition <- 0 to partitions - 1) {
          val topic_partition_key = topicName + "_" + partition
          val lastSavedOffset = jedis.get(topic_partition_key)
          val lastOffset = if (lastSavedOffset == null) 0L else lastSavedOffset.toLong
          fromOffsets += (new TopicPartition(topicName, partition) -> lastOffset)
        }
        jedis.close()
    
        fromOffsets.toMap
      }
    
      def main(args: Array[String]): Unit = {
        //初始化Redis Pool
        initRedisPool()
    
        val conf = new SparkConf()
          .setAppName("ScalaKafkaStream")
          .setMaster("local[3]")
    
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        val ssc = new StreamingContext(sc, Seconds(3))
    
        val bootstrapServers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
        val groupId = "kafka-test-group"
        val topicName = "Test"
        val maxPoll = 1000
    
        val kafkaParams = Map(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
          ConsumerConfig.GROUP_ID_CONFIG -> groupId,
          ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
        )
    
        // 这里指定Topic的Partition的总数
        val fromOffsets = getLastCommittedOffsets(topicName, 3)
    
        // 初始化KafkaDS
        val kafkaTopicDS =
          KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets))
    
        // 初始化KafkaSink,并广播
        val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
          val kafkaProducerConfig = {
            val p = new Properties()
            p.setProperty("bootstrap.servers", bootstrapServers)
            p.setProperty("key.serializer", classOf[StringSerializer].getName)
            p.setProperty("value.serializer", classOf[StringSerializer].getName)
            p
          }
          if (LOG.isInfoEnabled)
            LOG.info("kafka producer init done!")
          ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
        }
    
    
        kafkaTopicDS.foreachRDD(rdd => {
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
          // 如果rdd有数据
          if (!rdd.isEmpty()) {
            // 在每个Partition里执行
            rdd
              .map(_.value())
              .flatMap(_.split(" "))
              .map(x => (x, 1L))
              .reduceByKey(_ + _)
              .foreachPartition(partition => {
    
                val jedis = InternalRedisClient.getPool.getResource
                val p = jedis.pipelined()
                p.multi() //开启事务
    
                // 使用广播变量发送到Kafka
                partition.foreach(record => {
                  kafkaProducer.value.send("Test_Json", new Gson().toJson(record))
                })
    
                val offsetRange = offsetRanges(TaskContext.get.partitionId)
                println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
                val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition
                p.set(topic_partition_key, offsetRange.untilOffset + "")
    
                p.exec() //提交事务
                p.sync //关闭pipeline
                jedis.close()
              })
          }
        })
    
        ssc.start()
    
        // 优雅停止
        stopByMarkKey(ssc)
    
        ssc.awaitTermination()
      }
    
      /**
        * 优雅停止
        *
        * @param ssc
        */
      def stopByMarkKey(ssc: StreamingContext): Unit = {
        val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在
        var isStop = false
        while (!isStop) {
          isStop = ssc.awaitTerminationOrTimeout(intervalMills)
          if (!isStop && isExists(STOP_FLAG)) {
            LOG.warn("2秒后开始关闭sparstreaming程序.....")
            Thread.sleep(2000)
            ssc.stop(true, true)
          }
        }
      }
    
      /**
        * 判断Key是否存在
        *
        * @param key
        * @return
        */
      def isExists(key: String): Boolean = {
        val jedis = InternalRedisClient.getPool.getResource
        val flag = jedis.exists(key)
        jedis.close()
        flag
      }
    }
    

    创建名为Test_Json的Topic

    kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka --topic Test_Json --partitions 3 --replication-factor 3
    

    运行结果如下:


    相关文章

      网友评论

        本文标题:SparkStreaming之写数据到Kafka

        本文链接:https://www.haomeiwen.com/subject/ywafpftx.html