美文网首页
SparkStreaming 的代码在哪里运行

SparkStreaming 的代码在哪里运行

作者: 肌霸 | 来源:发表于2018-05-31 17:14 被阅读0次

    由于实习做的一个项目用 SparkStreaming 计算全量实时更新的数据,产生了对任务运行过程中代码运行位置的困惑( Driver 端执行还是 Executor 端执行?)做了以下测试。
    得出的结论有:

    • 在 Driver 端定义的变量如果不广播,则在 Executor 端为 null,即使变量类型为分布式的 DataSet
    • transform,foreachRDD 算子中的代码在 Driver 端运行,所以 Driver 端的变量对其是可以使用的,但是对于细化到 RDD 的算子中的运算如 map,foreachPartition,则是在 Executor 端运行
    • 在 Driver 端定义的变量若为 DataSet,则再使用 map 算子是在 Executor 端运行的
    object sqlWordCount {
    
      private var product_skuDataSet:Dataset[Row] = null
      private var kafkaParams :Map[String,Object] = null
      private val mysql_url = ""
    
      def getDimensionRDD(spark:SparkSession, url: String, table: String): Dataset[Row] = {
      // 打印在 driver
        System.err.println("get dimensionRDD")
    
        var rowRDD: Dataset[Row] = null
        val prop: util.Map[String, String] = new util.HashMap[String, String]
        prop.put("url", url)
        prop.put("dbtable", table)
        prop.put("driver", "com.mysql.jdbc.Driver")
        rowRDD = spark.read.format("jdbc").options(prop).load.coalesce(1)
        rowRDD
      }
        def main(args: Array[String]) {
    
          val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
    
          if(args.length>0) sparkConf.setMaster("local[4]")
    
          val ssc = new StreamingContext(sparkConf, Seconds(5))
    
          kafkaParams = Map[String, Object](
            "bootstrap.servers" -> "kafka1:9092,kafka2:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "want",
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> (false: java.lang.Boolean)
          )
    
          val topics = Array("test")
          val lines = KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
          )
    
          val words = lines.map(record => record.value).flatMap(_.split(" "))
          words.transform(rdd =>{
    
            val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
    
            if (product_skuDataSet == null ) {
              System.err.println("get product_sku from mysql")
              product_skuDataSet = getDimensionRDD(spark, mysql_url, "product_sku")
            }
    
            import spark.implicits._
    
            val wordsDataFrame = rdd.map(w =>{
              /**
                * 打印在 executor 端
                */
              System.err.println("execute map here:"+ Thread.currentThread().getName)
    
              /**
                * executor 无法获得变量的引用,即使这个变量是dataset
                * 报 NullPointException
                */
    //          product_skuDataSet.createOrReplaceTempView("product_sku")
    
              Record(w)
            } ).toDF()
    
            product_skuDataSet.limit(100).coalesce(2).map(row => sku(row.getLong(0),row.getString(1)))
                .foreachPartition(iterator =>{
                  while(iterator.hasNext){
                    val sku = iterator.next()
                    /**
                      * 在 executor 端输出
                      */
                    System.err.println("run in:"+Thread.currentThread().getName)
                    System.err.println(sku.id+": "+sku.sku_code)
                  }
                })
    
            wordsDataFrame.createOrReplaceTempView("words")
    
            val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
            /**
              * driver 端打印
              */
            wordCountsDataFrame.show()
    
            rdd
          }).foreachRDD(rdd =>{})
    
          ssc.start()
          ssc.awaitTermination()
        }
      }
    
    
    /** Case class for converting RDD to DataFrame */
    case class Record(word: String)
    
    case class sku(id:Long , sku_code:String)
    
    /** Lazily instantiated singleton instance of SparkSession */
    object SparkSessionSingleton {
    
      @transient
      private var instance: SparkSession = _
    
      def getInstance(sparkConf: SparkConf): SparkSession = {
        /**
          * driver 端执行
          */
        if (instance == null) {
          System.err.println("init sparkSession here")
          instance = SparkSession
            .builder
            .config(sparkConf)
            .getOrCreate()
        }
        instance
      }
    }
    

    相关文章

      网友评论

          本文标题:SparkStreaming 的代码在哪里运行

          本文链接:https://www.haomeiwen.com/subject/vsbqsftx.html