美文网首页
【spark开发】SparkStreaming+Kafka

【spark开发】SparkStreaming+Kafka

作者: 粮忆雨 | 来源:发表于2018-12-14 19:04 被阅读0次

    SparkStreaming集成的Kafka API

    Kafka项目在0.8和0.10版本之间引入了一个新的消费者API,因此有两个单独的对应的Spark流包可用。主要区别如下:

    spark-streaming-kafka-0-8 spark-streaming-kafka-0-10
    Broker Version 0.8.2.1 or higher 0.10.0 or higher
    API Maturity Deprecated Stable
    Language Support Scala, Java, Python Scala, Java
    Receiver DStream Yes No
    Direct DStream Yes Yes
    SSL / TLS Support No Yes
    Offset Commit API No Yes
    Dynamic Topic Subscription No Yes

    Kafka 0.10的Spark流集成在设计上类似于0.8 Direct Stream方法。它提供了简单的并行性,Kafka分区和Spark分区之间1:1的通信,以及对偏移量和元数据的访问。但是,由于新的集成使用new Kafka consumer API,而不是 simple consumer API,所以在使用上有显著的差异。目前集成的这个版本被标记为实验性的,因此API可能会发生变化。spark2.3+ 推荐使用kafka10 API。

    下面将基于两种API的开发Direct DStream demo示例。

    Q1:为何使用Direct DStream模式?
    Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。在默认配置下, 这种情况可能会在故障下丢失数据(即当Driver进程挂掉后,Driver下的Executor都会被杀掉,当更新完zookeeper消费偏移量的时候,Driver如果挂掉了,就会存在找不到数据的问题)。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。效率会有所降低。
    Direct方式就是将kafka看成存数据的一方,不是被动接收数据,而是主动去取数据。周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的consumer api来获取Kafka指定offset范围的数据。需要手动更新offsets。

    Q2:Direct模式并行度?
    Direct模式的并行度是由读取的kafka中topic的partition数决定。

    更多kafka基础概念:【kafka-基础】kafka基础概念及应用
    集群环境搭建安装:【kafka-部署】集群搭建&快速开始

    生产者模拟产生电表数据,代码如下:

    import java.text.SimpleDateFormat
    import java.util.{Date, Properties}
    
    import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
    import org.apache.kafka.common.serialization.StringSerializer
    
    
    /**
      * kafka生产者
      */
    object KafkaProducer {
        //kafka节点
        def BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"
    
        def TOPIC = "topicA"
    
        def isAsync = false
    
        def main(args: Array[String]): Unit = {
    
            val props = new Properties()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST)
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
            props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer")
            val producer = new KafkaProducer[Int, String](props)
            try {
                //模拟三个电表产生电量数据 active_quan:电量,create_time:电量产生时间,meter_id:电表id
                while (true) {
                    val cur_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)
                    val arr = Array("000a3804-0315-4de4-a846-74ac37812d08", "010892ae-9d99-42cf-aab7-6d074351b15a", "01e5a01c-661d-4b65-82ee-1309497b79e7")
                    for (meter_id <- arr) {
                        val msg = "{\"active_quan\":" + ((new util.Random).nextInt(100) + 1) + ",\"create_time\":\"" + cur_time + "\",\"meter_id\":\"" + meter_id + "\"}"
                        val startTime = System.currentTimeMillis()
                        if (isAsync) {
                            // Send asynchronously
                            producer.send(new ProducerRecord(TOPIC, msg), new DemoCallback(startTime, msg))
                        } else {
                            // Send synchronously
                            producer.send(new ProducerRecord(TOPIC,msg))
                            System.out.println("Sent message: (" + msg + ")")
                        }
                    }
                    Thread.sleep(30000)
                }
            } catch {
                case ex: Exception => {
                    println(ex)
                }
            } finally {
                producer.close
            }
        }
    }
    
    
    /**
      * kafka回调类
      * @param startTime
      * @param message
      */
    class DemoCallback(startTime : Long, message : String) extends Callback{
    
        override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
            val elapsedTime = System.currentTimeMillis() - startTime
            if (metadata != null) {
                System.out.println(
                    "message => (" + message + ") sent to partition(" + metadata.partition() +
                            "), " +
                            "offset(" + metadata.offset() + ") in " + elapsedTime + " ms")
            } else {
                e.printStackTrace()
            }
        }
    }
    

    maven依赖,0.8切换0.10只需修改spark-streaming-kafka-0-8_${scala.version}spark-streaming-kafka-0-10_${scala.version}

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.huidian.spark</groupId>
        <artifactId>slpark-example</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <spark.version>2.3.1</spark.version>
            <scala.version>2.11</scala.version>
            <hadoop.version>2.7.2</hadoop.version>
            <hbase.version>1.1.1</hbase.version>
            <mysql.version>5.1.40</mysql.version>
            <c3p0.version>0.9.1.2</c3p0.version>
        </properties>
    
        <dependencies>
            <!--spark-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <!--hbase-->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${hbase.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-common</artifactId>
                <version>${hbase.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>${hbase.version}</version>
            </dependency>
            <!--MySQL数据库连接-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.version}</version>
            </dependency>
            <dependency>
                <groupId>c3p0</groupId>
                <artifactId>c3p0</artifactId>
                <version>${c3p0.version}</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <version>2.15.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.6.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.19</version>
                    <configuration>
                        <skip>true</skip>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    spark-streaming-kafka-0-8示例

    scala实现

    spark-streaming消费kafka主类

    import com.hdc.spark.streaming.utils.KafkaManager
    import kafka.serializer.StringDecoder
    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.{HasOffsetRanges}
    
    /**
      * spark-streaming读取kafka
      * 模拟消费电表读表数据
      */
    object DirectKafkaMeterData {
    
    
        def main(args: Array[String]): Unit = {
    
            val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
            val ssc = new StreamingContext(conf, Seconds(60))
            //kafka节点
            val BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"
            val ZK_SERVERS = "hdc-data4:2181,hdc-data5:2181,hdc-data6:2181"
            val GROUP_ID = "meterdata_group" //消费者组
    
            val topics = Set("topicA") //待消费topic
    
            /*
            参数说明
            AUTO_OFFSET_RESET_CONFIG
                smallest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
                largest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
                disable:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
             */
            val kafkaParams = Map[String, String](
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> BROKER_LIST,
                ConsumerConfig.GROUP_ID_CONFIG -> GROUP_ID,
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "smallest"
            )
            val kafkaManager = new KafkaManager(kafkaParams)
            //创建数据流
            val kafkaStream: InputDStream[(String, String)] = kafkaManager.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,topics)
    
            kafkaStream.foreachRDD { rdd =>
                val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
                rdd.map(msg => msg._2).foreachPartition(ite=>{
                    ite.foreach(record => {
                        //处理数据的方法
                        println(record)
                    })
                })
                kafkaManager.updateZKOffsets(offsetRanges)
            }
            ssc.start()
            ssc.awaitTermination()
        }
    
    }
    

    Kafka offset管理类,使用zookeeper维护offset。除以下使用集成的kafka API去维护还可以使用zk client API去实现。

    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.Decoder
    import org.apache.spark.SparkException
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.{ KafkaCluster, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
    
    import scala.reflect.ClassTag
    
    
    class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {
    
        private val kc = new KafkaCluster(kafkaParams)
    
        /**
          * 创建数据流
          *
          * @param ssc
          * @param topics
          * @tparam K
          * @tparam V
          * @tparam KD
          * @tparam VD
          * @return
          */
        def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag](
              ssc: StreamingContext,
              topics: Set[String]): InputDStream[(K, V)] = {
            val groupId = kafkaParams.get("group.id").get
            // 在zookeeper上读取offsets前先根据实际情况更新offsets
            setOrUpdateOffsets(topics, groupId)
            //从zookeeper上读取offset开始消费message
            val kafkaStream = {
                val partitionsE = kc.getPartitions(topics)
                if (partitionsE.isLeft)
                    throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
                val partitions = partitionsE.right.get
                val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
                if (consumerOffsetsE.isLeft)
                    throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
                val consumerOffsets = consumerOffsetsE.right.get
                println(consumerOffsets)
                KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
                    ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
            }
            kafkaStream
        }
    
        /**
          * 创建数据流前,根据实际消费情况更新消费offsets
          * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
          * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
          * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
          * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
          * 这时把consumerOffsets更新为earliestLeaderOffsets
          *
          * @param topics
          * @param groupId
          */
        private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
            topics.foreach(topic => {
                var hasConsumed = true
                val partitionsE = kc.getPartitions(Set(topic))
                if (partitionsE.isLeft)
                    throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
                val partitions = partitionsE.right.get
                val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
                if (consumerOffsetsE.isLeft) hasConsumed = false
                if (hasConsumed) {
                    // 消费过
                    val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
                    if (earliestLeaderOffsetsE.isLeft)
                        throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
                    val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
                    val consumerOffsets = consumerOffsetsE.right.get
    
                    // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
                    var offsets: Map[TopicAndPartition, Long] = Map()
                    consumerOffsets.foreach({ case (tp, n) =>
                        val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
                        if (n < earliestLeaderOffset) {
                            println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
                                    " offsets已经过时,更新为" + earliestLeaderOffset)
                            offsets += (tp -> earliestLeaderOffset)
                        }
                    })
                    if (!offsets.isEmpty) {
                        kc.setConsumerOffsets(groupId, offsets)
                    }
                } else {
                    // 首次消费
                    println( groupId+" 第一次消费 Topic:" + topics)
                    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
                    var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
                    if (reset == Some("smallest")) {
                        val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
                        if (leaderOffsetsE.isLeft)
                            throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
                        leaderOffsets = leaderOffsetsE.right.get
                    } else {
                        val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
                        if (leaderOffsetsE.isLeft)
                            throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
                        leaderOffsets = leaderOffsetsE.right.get
                    }
                    val offsets = leaderOffsets.map {
                        case (tp, offset) => (tp, offset.offset)
                    }
                    kc.setConsumerOffsets(groupId, offsets)
                }
            })
        }
    
        /**
          * 更新zookeeper上的消费offsets
          *
          * @param offsetRanges
          */
        def updateZKOffsets(offsetRanges: Array[OffsetRange]): Unit = {
            val groupId = kafkaParams.get("group.id").get
            for (offsets <- offsetRanges) {
                val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
                val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
                if (o.isLeft) {
                    println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
                }
            }
        }
    
    }
    

    spark-streaming-kafka-0-10示例

    新版本的kafka 消费offset默认存在_consumer_offset的topic中。手动维护offset值需设置enable.auto.commit=false,如果为true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset操作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理造成数据丢失。

    way1:(官方推荐)
    默认消费偏移量存储于_consumer_offsets主题中,创建数据流时若不设置offsets则默认获取_consumer_offsets中相应消费组的消费偏移量量进行消费。若无相应消费偏移量则按照auto.offset.reset设置的消费策略进行消费。手动提交/更新消费偏移量使用stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)。实例代码如下:

    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.{SparkConf, TaskContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    
    /**
      * spark-streaming读取kafka
      */
    object DirectKafkaMeterData {
    
        //kafka节点
        def BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"
    
        def main(args: Array[String]): Unit = {
    
            val conf = new SparkConf().setMaster("local[2]").setAppName("DirectKafkaExample")
            val ssc = new StreamingContext(conf, Seconds(5))//流数据分批处理时间间隔
    
            /*
            参数说明
            AUTO_OFFSET_RESET_CONFIG
                earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
                latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
                none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
             */
            val kafkaParams = Map[String, Object](
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> BROKER_LIST,
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
                ConsumerConfig.GROUP_ID_CONFIG -> "kafka_group",
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
            )
    
            val topics = Array("topicA")
            //创建数据流
            val stream = KafkaUtils.createDirectStream[String, String](
                ssc,
                PreferConsistent,
                Subscribe[String, String](topics, kafkaParams)
            )
            //处理流数据
            stream.foreachRDD { rdd =>
                //手动维护偏移量
                val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
                rdd.foreachPartition { iter =>
                        iter.foreach(line => {
                            println(line.value())
                        })
                    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
                    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
                }
                // 在输出(outputs)完成一段时间之后,将偏移量同步更新到kafka
                stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
            }
            ssc.start()
            ssc.awaitTermination()
        }
    

    way2:将偏移量存储于zookeeper
    spark-streaming消费kafka主类

    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * spark-streaming读取kafka
      * 适用本版:spark-streaming-kafka-0-10
      * (0.10和0.8的API有较大的区别)
      */
    object DirectKafkaManagerMeterData {
    
        def main(args: Array[String]): Unit = {
    
            val conf = new SparkConf().setMaster("local").setAppName("DirectKafkaMeterData")
            val ssc = new StreamingContext(conf, Seconds(30))//流数据分批处理时间间隔
            //kafka节点
            val BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"
            val ZK_SERVERS = "hdc-data4:2181,hdc-data5:2181,hdc-data6:2181"
            val GROUP_ID = "meterdata_group" //消费者组
    
            val topics = Array("topicA") //待消费topic
    
            /*
            参数说明
            AUTO_OFFSET_RESET_CONFIG
                earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
                latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
                none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
             */
            val kafkaParams = Map[String, Object](
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> BROKER_LIST,
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
                ConsumerConfig.GROUP_ID_CONFIG -> GROUP_ID,
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
            )
    
            //采用zookeeper手动维护偏移量
            val zkManager = new KafkaOffsetZKManager(ZK_SERVERS)
            val fromOffsets = zkManager.getFromOffset(topics,GROUP_ID)
            //创建数据流
            var stream:InputDStream[ConsumerRecord[String, String]] = null
            if (fromOffsets.size > 0){
                stream = KafkaUtils.createDirectStream[String, String](
                    ssc,
                    PreferConsistent,
                    Subscribe[String, String](topics, kafkaParams, fromOffsets)
                )
            }else{
                stream = KafkaUtils.createDirectStream[String, String](
                    ssc,
                    PreferConsistent,
                    Subscribe[String, String](topics, kafkaParams)
                )
                println("第一次消费 Topic:" + topics)
            }
    
            //处理流数据
            stream.foreachRDD { rdd =>
                val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
                val rs = rdd.map(record => (record.offset(), record.partition(), record.value())).collect()
                for(item <- rs) println(item)
                // 处理数据存储到HDFS或Hbase等
                // 存储代码(略)
                // 处理完数据保存/更新偏移量
                zkManager.storeOffsets(offsetRanges,GROUP_ID)
            }
    
            ssc.start()
            ssc.awaitTermination()
        }
    
    }
    

    kafka偏移量zookeeper维护类

    import org.apache.curator.framework.CuratorFrameworkFactory
    import org.apache.curator.retry.ExponentialBackoffRetry
    import org.apache.kafka.common.TopicPartition
    import org.apache.spark.streaming.kafka010.OffsetRange
    
    /**
      * kafka偏移量zookeeper维护类
      * 适用本版:spark-streaming-kafka-0-10
      *
      * @param zkServers zookeeper server
      */
    class KafkaOffsetZKManager(zkServers : String) {
    
        //创建zookeeper连接客户端
        val zkClient = {
            val client = CuratorFrameworkFactory
                    .builder
                    .connectString(zkServers)
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
    //                .namespace("kafka")//创建包含隔离命名空间的会话
                    .build()
            client.start()
            client
        }
    
        val _base_path_of_kafka_offset = "/kafka/offsets" //offset 路径起始位置
    
    
        /**
          * 获取消费者组topic已消费偏移量(即本次起始偏移量)
          * @param topics topic集合
          * @param groupName 消费者组
          * @return
          */
        def getFromOffset(topics: Array[String], groupName:String):Map[TopicPartition, Long] = {
            // Kafka 0.8和0.10的版本差别:0.10->TopicPartition ,0.8->TopicAndPartition
            var fromOffset: Map[TopicPartition, Long] = Map()
            for(topic <- topics){
                val topic = topics(0).toString
                // 读取ZK中保存的Offset,作为Dstrem的起始位置。如果没有则创建该路径,并从 0 开始Dstream
                val zkTopicPath = s"${_base_path_of_kafka_offset}/${groupName}/${topic}"
                // 检查路径是否存在
                checkZKPathExists(zkTopicPath)
                // 获取topic的子节点,即 分区
                val childrens = zkClient.getChildren().forPath(zkTopicPath)
                // 遍历分区
                import scala.collection.JavaConversions._
                for (p <- childrens){
                    // 遍历读取子节点中的数据:即 offset
                    val offsetData = zkClient.getData().forPath(s"$zkTopicPath/$p")
                    // 将offset转为Long
                    val offSet = java.lang.Long.valueOf(new String(offsetData)).toLong
                    fromOffset += (new TopicPartition(topic, Integer.parseInt(p)) -> offSet)
                }
            }
            println(fromOffset)
            fromOffset
        }
    
        /**
          * 检查ZK中路径存在,不存在则创建该路径
          * @param path
          * @return
          */
        def checkZKPathExists(path: String)={
            if (zkClient.checkExists().forPath(path) == null) {
                zkClient.create().creatingParentsIfNeeded().forPath(path)
            }
        }
    
        /**
          * 保存或更新偏移量
          * @param offsetRange
          * @param groupName
          */
        def storeOffsets(offsetRange: Array[OffsetRange], groupName:String) = {
            for (o <- offsetRange){
                val zkPath = s"${_base_path_of_kafka_offset}/${groupName}/${o.topic}/${o.partition}"
                // 检查路径是否存在
                checkZKPathExists(zkPath)
                // 向对应分区第一次写入或者更新Offset 信息
                println("---Offset写入ZK------\nTopic:" + o.topic +", Partition:" + o.partition + ", Offset:" + o.untilOffset)
                zkClient.setData().forPath(zkPath, o.untilOffset.toString.getBytes())
            }
        }
    
    }
    

    相关文章

      网友评论

          本文标题:【spark开发】SparkStreaming+Kafka

          本文链接:https://www.haomeiwen.com/subject/tecvhqtx.html