由于实习做的一个项目用 SparkStreaming 计算全量实时更新的数据,产生了对任务运行过程中代码运行位置的困惑( Driver 端执行还是 Executor 端执行?)做了以下测试。
得出的结论有:
- 在 Driver 端定义的变量如果不广播,则在 Executor 端为 null,即使变量类型为分布式的 DataSet
- transform,foreachRDD 算子中的代码在 Driver 端运行,所以 Driver 端的变量对其是可以使用的,但是对于细化到 RDD 的算子中的运算如 map,foreachPartition,则是在 Executor 端运行
- 在 Driver 端定义的变量若为 DataSet,则再使用 map 算子是在 Executor 端运行的
object sqlWordCount {
private var product_skuDataSet:Dataset[Row] = null
private var kafkaParams :Map[String,Object] = null
private val mysql_url = ""
def getDimensionRDD(spark:SparkSession, url: String, table: String): Dataset[Row] = {
// 打印在 driver
System.err.println("get dimensionRDD")
var rowRDD: Dataset[Row] = null
val prop: util.Map[String, String] = new util.HashMap[String, String]
prop.put("url", url)
prop.put("dbtable", table)
prop.put("driver", "com.mysql.jdbc.Driver")
rowRDD = spark.read.format("jdbc").options(prop).load.coalesce(1)
rowRDD
}
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
if(args.length>0) sparkConf.setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
kafkaParams = Map[String, Object](
"bootstrap.servers" -> "kafka1:9092,kafka2:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "want",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val lines = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val words = lines.map(record => record.value).flatMap(_.split(" "))
words.transform(rdd =>{
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
if (product_skuDataSet == null ) {
System.err.println("get product_sku from mysql")
product_skuDataSet = getDimensionRDD(spark, mysql_url, "product_sku")
}
import spark.implicits._
val wordsDataFrame = rdd.map(w =>{
/**
* 打印在 executor 端
*/
System.err.println("execute map here:"+ Thread.currentThread().getName)
/**
* executor 无法获得变量的引用,即使这个变量是dataset
* 报 NullPointException
*/
// product_skuDataSet.createOrReplaceTempView("product_sku")
Record(w)
} ).toDF()
product_skuDataSet.limit(100).coalesce(2).map(row => sku(row.getLong(0),row.getString(1)))
.foreachPartition(iterator =>{
while(iterator.hasNext){
val sku = iterator.next()
/**
* 在 executor 端输出
*/
System.err.println("run in:"+Thread.currentThread().getName)
System.err.println(sku.id+": "+sku.sku_code)
}
})
wordsDataFrame.createOrReplaceTempView("words")
val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
/**
* driver 端打印
*/
wordCountsDataFrame.show()
rdd
}).foreachRDD(rdd =>{})
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class Record(word: String)
case class sku(id:Long , sku_code:String)
/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient
private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
/**
* driver 端执行
*/
if (instance == null) {
System.err.println("init sparkSession here")
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
网友评论