1. 广播变量
我们知道spark 的广播变量允许缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。常见于spark在一些全局统计的场景中应用。通过广播变量,能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。Spark也尝试着利用有效的广播算法去分配广播变量,以减少通信的成本。
一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
2. Spark Streaming 广播变量的更新
广播变量的声明很简单,调用broadcast就能搞定,并且scala中一切可序列化的对象都是可以进行广播的,这就给了我们很大的想象空间,可以利用广播变量将一些经常访问的大变量进行广播,而不是每个任务保存一份,这样可以减少资源上的浪费。
但是,现在项目中遇到一种这样的需求,用spark streaming 通过一些离线全局更新好的数据对用户进行实时推荐(当然这里基于一些spark streaming的内部机制,不能实现真正的时效性):(1)日志流通过kafka获取 (2) 解析日志流数据,融合离线的全局数据,对每个Dtream进行计算(3)计算结果最后发送到redis中。
其中就会涉及这样的问题:(1)离线全局的数据是需要全局获取的,不能局部进行计算 (2)这部分数据是离线定期更新的,而spark streaming一旦开始,就长时间运行。如果离线数据更新了,如何在开始的流计算中,获取到这部分更新后的数据。
针对上述问题,我们可以直接想的一种方法是,在driver端开启一个附属线程,周期性去获取离线的全局数据,然后通过diver分发到各个task中。但是考虑到这种方式:spark streaming整体的性能开销会很大,并且重新开启的后台线程的不易管理。结合spark中的广播变量,我们采用另一种方式来解决以上问题:
1> spark中的广播变量是只读的,通过unpersist函数,可以内存中的相关序列化对象
2> 通过Dstream的foreachRDD方法,做到定时更新 (官网上有说明,该方法是在driver端执行的)
import java.io.{ObjectInputStream, ObjectOutputStream}
import com.bf.dt.wireless.config.WirelessConfig
import com.bf.dt.wireless.formator.WirelessFormator
import com.bf.dt.wireless.storage.MysqlConnectionPool
import com.bf.dt.wireless.utils.DateUtils
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.json4s._
import org.slf4j.LoggerFactory
import scala.collection.mutable
object WirelessLogAnalysis {
object BroadcastWrapper {
@volatile private var instance: Broadcast[Map[String, List[String]]] = null
private val map = mutable.LinkedHashMap[String, List[String]]()
def getMysql(): Map[String, List[String]] = {
//1.获取mysql连接池的一个连接
val conn = MysqlConnectionPool.getConnection.get
//2.查询新的数据
val sql = "select aid_type,aids from cf_similarity"
val ps = conn.prepareStatement(sql)
val rs = ps.executeQuery()
while (rs.next()) {
val aid = rs.getString("aid_type")
val aids = rs.getString("aids").split(",").toList
map += (aid -> aids)
}
//3.连接池回收连接
MysqlConnectionPool.closeConnection(conn)
map.toMap
}
def update(sc: SparkContext, blocking: Boolean = false): Unit = {
if (instance != null)
instance.unpersist(blocking)
instance = sc.broadcast(getMysql())
}
def getInstance(sc: SparkContext): Broadcast[Map[String, List[String]]] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.broadcast(getMysql)
}
}
}
instance
}
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeObject(instance)
}
private def readObject(in: ObjectInputStream): Unit = {
instance = in.readObject().asInstanceOf[Broadcast[Map[String, List[String]]]]
}
}
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(this.getClass)
val conf = new SparkConf()
.setAppName("wirelessLogAnalysis")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaConfig: Map[String, String] = Map(
"metadata.broker.list" -> WirelessConfig.getConf.get.getString("wireless.metadata.broker.list"),
"group.id" -> WirelessConfig.getConf.get.getString("wireless.group.id"),
"zookeeper.connect" -> WirelessConfig.getConf.get.getString("wireless.zookeeper.connect"),
"auto.offset.reset" -> WirelessConfig.getConf.get.getString("wireless.auto.offset.reset")
)
val androidvvTopic = WirelessConfig.getConf.get.getString("wireless.topic1")
val iphonevvToplic = WirelessConfig.getConf.get.getString("wireless.topic2")
val kafkaDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaConfig,
Set(androidvvTopic, iphonevvToplic)
)
//原始日志流打印
kafkaDStream.print()
val jsonDstream = kafkaDStream.map(x =>
//解析日志流
WirelessFormator.format(x._2)
)
//解密的日志流打印
jsonDstream.print()
jsonDstream.foreachRDD {
rdd => {
// driver端运行,涉及操作:广播变量的初始化和更新
// 可以自定义更新时间
if ((DateUtils.getNowTime().split(" ")(1) >= "08:00:00") && (DateUtils.getNowTime().split(" ")(1) <= "10:10:00")) {
BroadcastWrapper.update(rdd.sparkContext, true)
println("广播变量更新成功: " + DateUtils.getNowTime())
}
//worker端运行,涉及操作:Dstream数据的处理和Redis更新
rdd.foreachPartition {
partitionRecords =>
//1.获取redis连接,保证每个partition建立一次连接,避免每个记录建立/关闭连接的性能消耗
partitionRecords.foreach(
record => {
//2.处理日志流
val uid = record._1
val aid_type = record._2 + "_" + record._3
if (cf.value.keySet.contains(aid_type)) {
(uid, cf.value.get(aid_type))
println((uid, cf.value.get(aid_type)))
}
else
(uid, "-1")
}
//3.redis更新数据
)
//4.关闭redis连接
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
说明:以上是无线推荐项目中部分代码,其中离线全局数据存储在mysql中,
MysqlConnectionPool是mysql连接池定义类,WirelessFormator是日志解密的定义类
网友评论