1 Spark-streaming
1.1Spark-streaming 的KafkaManager类
1.1.1KafkaManager主要封装了两个方法
程序启动的时候获取consumer-group在zk保存的offset以及当前topic拥有的最大以及最小offset,获取正确的offset,代码如下
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach(topic => {
var hasConsumed = true
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get.mkString("\n")}")
val partitions: Set[TopicAndPartition] = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
log.info("consumerOffsetsE.isLeft: " + consumerOffsetsE.isLeft)
if (hasConsumed) {
// 消费过
log.info("消费过")
/**
* 如果zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
* 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
* 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
* 这时把consumerOffsets更新为earliestLeaderOffsets
*/
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft) throw new SparkException(s"get earliest offsets failed: ${earliestLeaderOffsetsE.left.get.mkString("\n")}")
val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
val consumerOffsets = consumerOffsetsE.right.get
// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({
case (tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
if (n < earliestLeaderOffset) {
log.info("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
" offsets已经过时,更新为" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
})
log.info("offsets: " + consumerOffsets)
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
} else {
// 没有消费过
log.info("没消费过")
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
if (reset == Some("smallest")) {
leaderOffsets = kc.getEarliestLeaderOffsets(partitions).right.get
} else {
leaderOffsets = kc.getLatestLeaderOffsets(partitions).right.get
}
val offsets = leaderOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
log.info("offsets: " + offsets)
kc.setConsumerOffsets(groupId, offsets)
}
})
}
当处理完DStream后,手动更新zk上的offset,如果程序出异常,则不更新offset,做到at-least-once,代码如下
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
/**
* 在这更新 currentOffsets 从而做到自适应上游 partition 数目变化
*/
updateCurrentOffsetForKafkaPartitionChange()
super.compute(validTime)
}
def updateZKOffsets(rdd: RDD[(String,String)]): Unit = {
val groupId = kafkaParams.get("group.id").get
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
log.info(s"${DateTimeUtils.getStringDateTime()} @@@@@@ topic ${offsets.topic} partition ${offsets.partition} fromoffset ${offsets.fromOffset} untiloffset ${offsets.untilOffset}" +
s" diff ${{offsets.fromOffset} - {offsets.untilOffset} } #######")
if (o.isLeft) {
log.error(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
1.2 Spark-streaming 的自适应partition变化类
增加AdaptiveUpStreamDirectKafkaInputDStream自适应partition增加功能类,比如topic增加partition,spark-streaming自适应partition变化,关键代码如下
private def updateCurrentOffsetForKafkaPartitionChange(): Unit = {
val topic = currentOffsets.head._1.topic
val nextPartitions: Int = getTopicMeta(topic) match {
case Some(x) => x.partitionsMetadata.size()
case _ => 0
}
val currPartitions = currentOffsets.keySet.size
if (nextPartitions > currPartitions
) {
var i = currPartitions
while (i < nextPartitions
) {
currentOffsets = currentOffsets + (TopicAndPartition(topic, i) -> 0
)
i = i + 1
}
}
logInfo(s"######### ${
nextPartitions
} currentParttions ${
currentOffsets.keySet.size
} ########")
}
private def getTopicMeta(topic: String): Option[TopicMetadata] = {
var metaData: Option[TopicMetadata] = None
var consumer: Option[SimpleConsumer] = None
val topics = List[String](topic)
val brokerList = MTkafkaParams.get("metadata.broker.list").get.split(",")
//val brokerList = kafkaBrokerList.split(",")
brokerList.foreach(
item => {
val hostPort = item.split(":")
try {
breakable {
for (i <- 0 to 3) {
consumer = Some(new SimpleConsumer(host = hostPort(0), port = hostPort(1).toInt,
soTimeout = 10000, bufferSize = 64 * 1024, clientId = "leaderLookup"))
val req: TopicMetadataRequest = new TopicMetadataRequest(topics.asJava)
val resp = consumer.get.send(req)
metaData = Some(resp.topicsMetadata.get(0))
if (metaData.get.errorCode == ErrorMapping.NoError) break()
}
}
} catch {
case e => ;
logInfo(s" ###### Error in AdaptiveUpStreamDirectKafkaInputDStream ${
e
} ######")
}
}
)
metaData
}
1.3Spark-streaming 的编程模板代码
object TestMain {
@transient lazy val LOG: Logger = LoggerFactory.getLogger(this.getClass);
def main(args: Array[String]): Unit = {
var conf_file = ""
if (args.length > 0) {
conf_file = args(0)
}
//init conf_file
SparkUtils.initialize(conf_file)
//init ssc
val sparkConf = SparkUtils.buildSparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(SparkUtils.getValueByKey(Constants.SPARK_APP_DURATION).toLong))
//init kafka params
val kafkaParams = SparkUtils.buildkafkaParams()
val topics = SparkUtils.getValueByKey(Constants.KAFKA_TOPICS)
//init DStream
val km = new KafkaManager(kafkaParams)
val kafkaDirectStream = km.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.split(",").toSet)
LOG.info("======Initial Done======")
kafkaDirectStream.cache()
// init hbase config to broadcast to cluster executors
val hbaseConfig = ssc.sparkContext.broadcast(SparkUtils.buildHbaseProps())
//transform
val processedDStream = kafkaDirectStream.map(_._2)
//to do something
//action
processedDStream.foreachRDD { rdd =>
//LOG.info("### rdd count = " + rdd.count() + "### rdd partition count:" + rdd.partitions.length)
rdd.foreachPartition { partitionOfRecords =>
//to do something
}
}
//update zk offsets
kafkaDirectStream.foreachRDD(rdd => {
if (!rdd.isEmpty)
km.updateZKOffsets(rdd)
})
ssc.start()
ssc.awaitTermination()
}
}
1.4Spark-streaming 的工具类
jedis pool
/**
* Created by lancerlin on 2018/1/3.
*/
class MyRedisConnectionPool(createMyRedisConnnectionPoolFunc: () => RedisConnectionPool) extends Serializable {
@transient lazy val LOG: Logger = LoggerFactory.getLogger(this.getClass)
lazy val pool = createMyRedisConnnectionPoolFunc()
}
object MyRedisConnectionPool {
lazy val config = {
val cf = new PoolConfig
cf.setMaxTotal(100)
cf.setMaxIdle(5)
cf.setMaxWaitMillis(1000)
cf.setTestOnBorrow(true)
cf
}
def apply[K, V](props: java.util.Properties): MyRedisConnectionPool = {
val createMyRedisConnnectionPoolFunc = () => {
val pool = new RedisConnectionPool(config, props)
sys.addShutdownHook {
pool.close()
}
pool
}
new MyRedisConnectionPool(createMyRedisConnnectionPoolFunc)
}
}
streaming代码中的用法
//broadcast redis info
val redisConfig = ssc.sparkContext.broadcast(SparkUtils.buildRedisProps())
.....
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionRecords => {
val pool = MyRedisConnectionPool(redisConfig.value).pool
val proxy = pool.getConnection
while (partitionRecords.hasNext) {
proxy.doMethod()
}
pool.returnConnection(proxy)
})
})
同理kafkaproducer
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object MySparkKafkaProducer {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new MySparkKafkaProducer(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
}
同理mysqlUitls
/**
* Created by lancerlin on 2017/12/28.
*/
class MysqlPool(createMysqlPoolFunc:() => JdbcConnectionPool) extends Serializable {
println("init ....")
lazy val pool = createMysqlPoolFunc()
}
object MysqlPool {
lazy val config = {
val cf = new PoolConfig
cf.setMaxTotal(100)
cf.setMaxIdle(5)
cf.setMaxWaitMillis(1000)
cf.setTestOnBorrow(true)
cf
}
def apply[K, V](props: java.util.Properties): MysqlPool = {
val createMysqlPoolFunc = () => {
val pool = new JdbcConnectionPool(config, props)
sys.addShutdownHook {
pool.close()
}
pool
}
new MysqlPool(createMysqlPoolFunc)
}
}
1.5 Spark-streaming 中如何更新broadcast
import java.io.{ObjectInputStream, ObjectOutputStream}
import java.util.Calendar
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import org.slf4j.{Logger, LoggerFactory}
import scala.reflect.ClassTag
/**
* Created by lancerlin on 2018/2/1.
*
* 更新broadcast工具类,使用方式
* *
* val value1 = MyBroadcastWrapper(ssc, new Date().toString)
* *
*processedDStream.foreachRDD { rdd =>
* *
* value1.update(new Date().toString, true)
* *
*rdd.foreachPartition {
* partitionOfRecords =>
* while (partitionOfRecords.hasNext){
* val str = partitionOfRecords.next()
* }
*LOG.warn(s"borad cast value @@@@ [{${value1.value}}])")
* }
* }
*
*/
// This wrapper lets us update brodcast variables within DStreams' foreachRDD
// without running into serialization issues
case class MyBroadcastWrapper[T: ClassTag](
@transient private val ssc: StreamingContext,
@transient private val _v: T
) {
var lastUpdatedAt = Calendar.getInstance.getTime
@transient lazy val LOG: Logger = LoggerFactory.getLogger(this.getClass);
@transient private var v = ssc.sparkContext.broadcast(_v)
def update(newValue: T, blocking: Boolean = false, updateInterval: Integer): Unit = {
val currentDate = Calendar.getInstance.getTime
val diff = currentDate.getTime - lastUpdatedAt.getTime
//默认60秒更新,这个时间跟进需求修改
var interval = 60000
if (updateInterval != null && updateInterval >= 0) {
interval = updateInterval
}
if (v == null || diff > interval) {
lastUpdatedAt = Calendar.getInstance.getTime
if (v != null) {
v.unpersist(blocking)
}
v = ssc.sparkContext.broadcast(newValue)
}
}
def value: T = v.value
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeObject(v)
}
private def readObject(in: ObjectInputStream): Unit = {
v = in.readObject().asInstanceOf[Broadcast[T]]
}
}
用法
val value = MyBroadcastWrapper(ssc, new Date().toString)
processedDStream.foreachRDD { rdd =>
//默认一分钟更新一次,传入120000,修改为两分钟更新一次
value1.update(new Date().toString, true,120000)
rdd.foreachPartition {
partitionOfRecords =>
while (partitionOfRecords.hasNext){
val str = partitionOfRecords.next()
}
LOG.warn(s"borad cast value @@@@ [{${value.value}}])")
}
}
测试结果,10秒一个批,两分钟更新日志的值LOG.warn(s"borad cast value @@@@ [{${value.value}}])")

参考文档:
https://stackoverflow.com/questions/33372264/how-can-i-update-a-broadcast-variable-in-spark-streaming
https://www.jianshu.com/p/5dbb102cbbd9
网友评论