美文网首页
2018-12-25

2018-12-25

作者: 宇智波_佐助 | 来源:发表于2018-12-25 15:35 被阅读0次

    spark-streaming消费kafka数据:

    首次消费截图:

    手动kill,再次启动:


    KafkaManager类:

    package org.apache.spark.streaming.kafka

    import kafka.common.TopicAndPartition

    import kafka.message.MessageAndMetadata

    import kafka.serializer.Decoder

    import org.apache.spark.SparkException

    import org.apache.spark.rdd.RDD

    import org.apache.spark.streaming.StreamingContext

    import org.apache.spark.streaming.dstream.InputDStream

    import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset

    import scala.reflect.ClassTag

    /**

      * 手动管理偏移量

      */

    class KafkaManager(val kafkaParams:Map[String,String])extends Serializable {

    private val kc =new KafkaCluster(kafkaParams)

    /**

        * 创建数据流

        */

      def createDirectStream[K: ClassTag,

    V: ClassTag,

    KD <: Decoder[K]: ClassTag,

    VD <: Decoder[V]: ClassTag](ssc: StreamingContext,

    kafkaParams:Map[String,String],

    topics:Set[String]): InputDStream[(K,V)] =  {

    val groupId = kafkaParams("group.id")

    // 在zookeeper上读取offsets前先根据实际情况更新offsets

        setOrUpdateOffsets(topics, groupId)

    //从zookeeper上读取offset开始消费message

        val messages = {

    val partitionsE =kc.getPartitions(topics)

    if (partitionsE.isLeft)

    throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")

    val partitions = partitionsE.right.get

    val consumerOffsetsE =kc.getConsumerOffsets(groupId, partitions)

    if (consumerOffsetsE.isLeft)

    throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")

    val consumerOffsets = consumerOffsetsE.right.get

    KafkaUtils.createDirectStream[K,V,KD,VD, (K,V)](

    ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K,V]) => (mmd.key, mmd.message))

    }

    messages

    }

    /**

        * 创建数据流前,根据实际消费情况更新消费offsets

        * @param topics

        * @param groupId

        */

      private def setOrUpdateOffsets(topics:Set[String], groupId:String): Unit = {

    topics.foreach(topic => {

    var hasConsumed =true

          val partitionsE =kc.getPartitions(Set(topic))

    if (partitionsE.isLeft)

    throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")

    val partitions = partitionsE.right.get

    //kc根据消费者组和主题对应的分区从zookeeper获取偏移量

          val consumerOffsetsE =kc.getConsumerOffsets(groupId, partitions)

    if (consumerOffsetsE.isLeft) hasConsumed =false

          if (hasConsumed) {// 消费过

            /**

              * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,

              * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。

              * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,

              * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,

              * 这时把consumerOffsets更新为earliestLeaderOffsets

    */

            println("------消费过------")

    val earliestLeaderOffsetsE =kc.getEarliestLeaderOffsets(partitions)

    if (earliestLeaderOffsetsE.isLeft)

    throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")

    val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get

    val consumerOffsets = consumerOffsetsE.right.get

    // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets

            var offsets:Map[TopicAndPartition, Long] =Map()

    consumerOffsets.foreach({case(tp, n) =>

    val earliestLeaderOffset = earliestLeaderOffsets(tp).offset

    if (n < earliestLeaderOffset) {

    println("consumer group:" + groupId +",topic:" + tp.topic +",partition:" + tp.partition +

    " offsets已经过时,更新为" + earliestLeaderOffset)

    offsets += (tp -> earliestLeaderOffset)

    }

    })

    //若是kafka分区发生新增,则对应的分区偏移量设置为从头开始消费

            val earliestTopicAndPartition:Set[TopicAndPartition] = earliestLeaderOffsets.keySet

    for(topicAndPartition <- earliestTopicAndPartition){

    if(!consumerOffsets.contains(topicAndPartition)){

    println("consumer group:" + groupId +",topic:" + topicAndPartition.topic +",partition:" + topicAndPartition.partition +

    " kafka分区新增设置偏移量为0L")

    offsets += (topicAndPartition ->0L)

    }

    }

    if (offsets.nonEmpty) {

    kc.setConsumerOffsets(groupId, offsets)

    }

    }else {// 没有消费过

            println("------没有消费过------")

    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)

    var leaderOffsets:Map[TopicAndPartition, LeaderOffset] =null

            if (reset ==Some("smallest")) {

    val leaderOffsetsE =kc.getEarliestLeaderOffsets(partitions)

    if (leaderOffsetsE.isLeft)

    throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")

    leaderOffsets = leaderOffsetsE.right.get

    }else {

    val leaderOffsetsE =kc.getLatestLeaderOffsets(partitions)

    if (leaderOffsetsE.isLeft)

    throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")

    leaderOffsets = leaderOffsetsE.right.get

    }

    val offsets = leaderOffsets.map {

    case (tp, offset) => (tp, offset.offset)

    }

    kc.setConsumerOffsets(groupId, offsets)

    }

    })

    }

    /**

        * 更新zookeeper上的消费offsets

        * @param rdd

        */

      def updateZKOffsets(rdd: RDD[(String,String)]) : Unit = {

    val groupId = kafkaParams.get("group.id").get

    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    for (offsets <- offsetsList) {

    val topicAndPartition =TopicAndPartition(offsets.topic, offsets.partition)

    val o =kc.setConsumerOffsets(groupId,Map((topicAndPartition, offsets.untilOffset)))

    if (o.isLeft) {

    println(s"Error updating the offset to Kafka cluster: ${o.left.get}")

    }

    }

    }

    }

    测试object:

    package streaming

    import kafka.serializer.StringDecoder

    import org.apache.log4j.{Level, Logger}

    import org.apache.spark.SparkConf

    import org.apache.spark.rdd.RDD

    import org.apache.spark.streaming.kafka.KafkaManager

    import org.apache.spark.streaming.{Seconds, StreamingContext}

    /**

    *

    */

    object SparkKafkaStreaming{

      def processRdd(rdd: RDD[(String,String)]): Unit = {

        val lines = rdd.map(_._2)

        lines.foreach(println)

    }

    def main(args: Array[String]) {

    if (args.length <3) {

    System.err.println(

    s"""

    |Usage: DirectKafkaWordCount

    |  is a list of one or more Kafka brokers

    |    is a list of one or more kafka topics to consume from

    |  is a consume group

    |

            """.stripMargin)

    System.exit(1)

    }

    Logger.getLogger("org").setLevel(Level.WARN)

        val Array(brokers, topics, groupId) = args

    // Create context with 2 second batch interval

        val sparkConf =new SparkConf().setAppName("DirectKafkaWordCount")

    sparkConf.setMaster("local[3]")

    sparkConf.set("spark.streaming.kafka.maxRatePerPartition","5")

    sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

    val ssc =new StreamingContext(sparkConf,Seconds(5))

    ssc.sparkContext.setLogLevel("WARN")

    // Create direct kafka stream with brokers and topics

        val topicsSet = topics.split(",").toSet

    val kafkaParams =Map[String,String](

    "metadata.broker.list" -> brokers,

    "group.id" -> groupId,

    "auto.offset.reset" ->"smallest"

        )

    val km =new KafkaManager(kafkaParams)

    val messages = km.createDirectStream[String,String, StringDecoder, StringDecoder](

    ssc, kafkaParams, topicsSet)

    messages.foreachRDD(rdd => {

    if (!rdd.isEmpty()) {

    // 先处理消息

            processRdd(rdd)

    // 再更新offsets

            km.updateZKOffsets(rdd)

    }

    })

    ssc.start()

    ssc.awaitTermination()

    }

    }

    修改点:

    若是kafka新增分区,zookeeper无对应的分区,消费从头开始消费

    相关文章

      网友评论

          本文标题:2018-12-25

          本文链接:https://www.haomeiwen.com/subject/jyuflqtx.html