SparkStreaming集成的Kafka API
Kafka项目在0.8和0.10版本之间引入了一个新的消费者API,因此有两个单独的对应的Spark流包可用。主要区别如下:
spark-streaming-kafka-0-8 | spark-streaming-kafka-0-10 | |
---|---|---|
Broker Version | 0.8.2.1 or higher | 0.10.0 or higher |
API Maturity | Deprecated | Stable |
Language Support | Scala, Java, Python | Scala, Java |
Receiver DStream | Yes | No |
Direct DStream | Yes | Yes |
SSL / TLS Support | No | Yes |
Offset Commit API | No | Yes |
Dynamic Topic Subscription | No | Yes |
Kafka 0.10的Spark流集成在设计上类似于0.8 Direct Stream方法。它提供了简单的并行性,Kafka分区和Spark分区之间1:1的通信,以及对偏移量和元数据的访问。但是,由于新的集成使用new Kafka consumer API,而不是 simple consumer API,所以在使用上有显著的差异。目前集成的这个版本被标记为实验性的,因此API可能会发生变化。spark2.3+ 推荐使用kafka10 API。
下面将基于两种API的开发Direct DStream demo示例。
Q1:为何使用Direct DStream模式?
Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。在默认配置下, 这种情况可能会在故障下丢失数据(即当Driver进程挂掉后,Driver下的Executor都会被杀掉,当更新完zookeeper消费偏移量的时候,Driver如果挂掉了,就会存在找不到数据的问题)。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。效率会有所降低。
Direct方式就是将kafka看成存数据的一方,不是被动接收数据,而是主动去取数据。周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的consumer api来获取Kafka指定offset范围的数据。需要手动更新offsets。
Q2:Direct模式并行度?
Direct模式的并行度是由读取的kafka中topic的partition数决定。
更多kafka基础概念:【kafka-基础】kafka基础概念及应用
集群环境搭建安装:【kafka-部署】集群搭建&快速开始
生产者模拟产生电表数据,代码如下:
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
/**
* kafka生产者
*/
object KafkaProducer {
//kafka节点
def BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"
def TOPIC = "topicA"
def isAsync = false
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer")
val producer = new KafkaProducer[Int, String](props)
try {
//模拟三个电表产生电量数据 active_quan:电量,create_time:电量产生时间,meter_id:电表id
while (true) {
val cur_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)
val arr = Array("000a3804-0315-4de4-a846-74ac37812d08", "010892ae-9d99-42cf-aab7-6d074351b15a", "01e5a01c-661d-4b65-82ee-1309497b79e7")
for (meter_id <- arr) {
val msg = "{\"active_quan\":" + ((new util.Random).nextInt(100) + 1) + ",\"create_time\":\"" + cur_time + "\",\"meter_id\":\"" + meter_id + "\"}"
val startTime = System.currentTimeMillis()
if (isAsync) {
// Send asynchronously
producer.send(new ProducerRecord(TOPIC, msg), new DemoCallback(startTime, msg))
} else {
// Send synchronously
producer.send(new ProducerRecord(TOPIC,msg))
System.out.println("Sent message: (" + msg + ")")
}
}
Thread.sleep(30000)
}
} catch {
case ex: Exception => {
println(ex)
}
} finally {
producer.close
}
}
}
/**
* kafka回调类
* @param startTime
* @param message
*/
class DemoCallback(startTime : Long, message : String) extends Callback{
override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
val elapsedTime = System.currentTimeMillis() - startTime
if (metadata != null) {
System.out.println(
"message => (" + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms")
} else {
e.printStackTrace()
}
}
}
maven依赖,0.8切换0.10只需修改spark-streaming-kafka-0-8_${scala.version}
为spark-streaming-kafka-0-10_${scala.version}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.huidian.spark</groupId>
<artifactId>slpark-example</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>2.3.1</spark.version>
<scala.version>2.11</scala.version>
<hadoop.version>2.7.2</hadoop.version>
<hbase.version>1.1.1</hbase.version>
<mysql.version>5.1.40</mysql.version>
<c3p0.version>0.9.1.2</c3p0.version>
</properties>
<dependencies>
<!--spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--hbase-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<!--MySQL数据库连接-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>${c3p0.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
spark-streaming-kafka-0-8示例
scala实现
spark-streaming消费kafka主类
import com.hdc.spark.streaming.utils.KafkaManager
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges}
/**
* spark-streaming读取kafka
* 模拟消费电表读表数据
*/
object DirectKafkaMeterData {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(60))
//kafka节点
val BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"
val ZK_SERVERS = "hdc-data4:2181,hdc-data5:2181,hdc-data6:2181"
val GROUP_ID = "meterdata_group" //消费者组
val topics = Set("topicA") //待消费topic
/*
参数说明
AUTO_OFFSET_RESET_CONFIG
smallest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
largest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
disable:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
*/
val kafkaParams = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> BROKER_LIST,
ConsumerConfig.GROUP_ID_CONFIG -> GROUP_ID,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "smallest"
)
val kafkaManager = new KafkaManager(kafkaParams)
//创建数据流
val kafkaStream: InputDStream[(String, String)] = kafkaManager.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,topics)
kafkaStream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.map(msg => msg._2).foreachPartition(ite=>{
ite.foreach(record => {
//处理数据的方法
println(record)
})
})
kafkaManager.updateZKOffsets(offsetRanges)
}
ssc.start()
ssc.awaitTermination()
}
}
Kafka offset管理类,使用zookeeper维护offset。除以下使用集成的kafka API去维护还可以使用zk client API去实现。
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{ KafkaCluster, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import scala.reflect.ClassTag
class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {
private val kc = new KafkaCluster(kafkaParams)
/**
* 创建数据流
*
* @param ssc
* @param topics
* @tparam K
* @tparam V
* @tparam KD
* @tparam VD
* @return
*/
def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag](
ssc: StreamingContext,
topics: Set[String]): InputDStream[(K, V)] = {
val groupId = kafkaParams.get("group.id").get
// 在zookeeper上读取offsets前先根据实际情况更新offsets
setOrUpdateOffsets(topics, groupId)
//从zookeeper上读取offset开始消费message
val kafkaStream = {
val partitionsE = kc.getPartitions(topics)
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft)
throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
val consumerOffsets = consumerOffsetsE.right.get
println(consumerOffsets)
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
kafkaStream
}
/**
* 创建数据流前,根据实际消费情况更新消费offsets
* 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
* 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
* 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
* 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
* 这时把consumerOffsets更新为earliestLeaderOffsets
*
* @param topics
* @param groupId
*/
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach(topic => {
var hasConsumed = true
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
if (hasConsumed) {
// 消费过
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
val consumerOffsets = consumerOffsetsE.right.get
// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({ case (tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
if (n < earliestLeaderOffset) {
println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
" offsets已经过时,更新为" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
})
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
} else {
// 首次消费
println( groupId+" 第一次消费 Topic:" + topics)
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
if (reset == Some("smallest")) {
val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
} else {
val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
}
val offsets = leaderOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(groupId, offsets)
}
})
}
/**
* 更新zookeeper上的消费offsets
*
* @param offsetRanges
*/
def updateZKOffsets(offsetRanges: Array[OffsetRange]): Unit = {
val groupId = kafkaParams.get("group.id").get
for (offsets <- offsetRanges) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
}
spark-streaming-kafka-0-10示例
新版本的kafka 消费offset默认存在_consumer_offset的topic中。手动维护offset值需设置enable.auto.commit=false
,如果为true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset操作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理造成数据丢失。
way1:(官方推荐)
默认消费偏移量存储于_consumer_offsets主题中,创建数据流时若不设置offsets则默认获取_consumer_offsets中相应消费组的消费偏移量量进行消费。若无相应消费偏移量则按照auto.offset.reset
设置的消费策略进行消费。手动提交/更新消费偏移量使用stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
。实例代码如下:
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
* spark-streaming读取kafka
*/
object DirectKafkaMeterData {
//kafka节点
def BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("DirectKafkaExample")
val ssc = new StreamingContext(conf, Seconds(5))//流数据分批处理时间间隔
/*
参数说明
AUTO_OFFSET_RESET_CONFIG
earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
*/
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> BROKER_LIST,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> "kafka_group",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
val topics = Array("topicA")
//创建数据流
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
//处理流数据
stream.foreachRDD { rdd =>
//手动维护偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
iter.foreach(line => {
println(line.value())
})
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
// 在输出(outputs)完成一段时间之后,将偏移量同步更新到kafka
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
ssc.start()
ssc.awaitTermination()
}
way2:将偏移量存储于zookeeper
spark-streaming消费kafka主类
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* spark-streaming读取kafka
* 适用本版:spark-streaming-kafka-0-10
* (0.10和0.8的API有较大的区别)
*/
object DirectKafkaManagerMeterData {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("DirectKafkaMeterData")
val ssc = new StreamingContext(conf, Seconds(30))//流数据分批处理时间间隔
//kafka节点
val BROKER_LIST = "hdc-data4:9092,hdc-data5:9092,hdc-data6:9092"
val ZK_SERVERS = "hdc-data4:2181,hdc-data5:2181,hdc-data6:2181"
val GROUP_ID = "meterdata_group" //消费者组
val topics = Array("topicA") //待消费topic
/*
参数说明
AUTO_OFFSET_RESET_CONFIG
earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
*/
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> BROKER_LIST,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> GROUP_ID,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
//采用zookeeper手动维护偏移量
val zkManager = new KafkaOffsetZKManager(ZK_SERVERS)
val fromOffsets = zkManager.getFromOffset(topics,GROUP_ID)
//创建数据流
var stream:InputDStream[ConsumerRecord[String, String]] = null
if (fromOffsets.size > 0){
stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams, fromOffsets)
)
}else{
stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
println("第一次消费 Topic:" + topics)
}
//处理流数据
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val rs = rdd.map(record => (record.offset(), record.partition(), record.value())).collect()
for(item <- rs) println(item)
// 处理数据存储到HDFS或Hbase等
// 存储代码(略)
// 处理完数据保存/更新偏移量
zkManager.storeOffsets(offsetRanges,GROUP_ID)
}
ssc.start()
ssc.awaitTermination()
}
}
kafka偏移量zookeeper维护类
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
/**
* kafka偏移量zookeeper维护类
* 适用本版:spark-streaming-kafka-0-10
*
* @param zkServers zookeeper server
*/
class KafkaOffsetZKManager(zkServers : String) {
//创建zookeeper连接客户端
val zkClient = {
val client = CuratorFrameworkFactory
.builder
.connectString(zkServers)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
// .namespace("kafka")//创建包含隔离命名空间的会话
.build()
client.start()
client
}
val _base_path_of_kafka_offset = "/kafka/offsets" //offset 路径起始位置
/**
* 获取消费者组topic已消费偏移量(即本次起始偏移量)
* @param topics topic集合
* @param groupName 消费者组
* @return
*/
def getFromOffset(topics: Array[String], groupName:String):Map[TopicPartition, Long] = {
// Kafka 0.8和0.10的版本差别:0.10->TopicPartition ,0.8->TopicAndPartition
var fromOffset: Map[TopicPartition, Long] = Map()
for(topic <- topics){
val topic = topics(0).toString
// 读取ZK中保存的Offset,作为Dstrem的起始位置。如果没有则创建该路径,并从 0 开始Dstream
val zkTopicPath = s"${_base_path_of_kafka_offset}/${groupName}/${topic}"
// 检查路径是否存在
checkZKPathExists(zkTopicPath)
// 获取topic的子节点,即 分区
val childrens = zkClient.getChildren().forPath(zkTopicPath)
// 遍历分区
import scala.collection.JavaConversions._
for (p <- childrens){
// 遍历读取子节点中的数据:即 offset
val offsetData = zkClient.getData().forPath(s"$zkTopicPath/$p")
// 将offset转为Long
val offSet = java.lang.Long.valueOf(new String(offsetData)).toLong
fromOffset += (new TopicPartition(topic, Integer.parseInt(p)) -> offSet)
}
}
println(fromOffset)
fromOffset
}
/**
* 检查ZK中路径存在,不存在则创建该路径
* @param path
* @return
*/
def checkZKPathExists(path: String)={
if (zkClient.checkExists().forPath(path) == null) {
zkClient.create().creatingParentsIfNeeded().forPath(path)
}
}
/**
* 保存或更新偏移量
* @param offsetRange
* @param groupName
*/
def storeOffsets(offsetRange: Array[OffsetRange], groupName:String) = {
for (o <- offsetRange){
val zkPath = s"${_base_path_of_kafka_offset}/${groupName}/${o.topic}/${o.partition}"
// 检查路径是否存在
checkZKPathExists(zkPath)
// 向对应分区第一次写入或者更新Offset 信息
println("---Offset写入ZK------\nTopic:" + o.topic +", Partition:" + o.partition + ", Offset:" + o.untilOffset)
zkClient.setData().forPath(zkPath, o.untilOffset.toString.getBytes())
}
}
}
网友评论