美文网首页
Spark-task执行过程中的序列化

Spark-task执行过程中的序列化

作者: 布莱安托 | 来源:发表于2020-07-07 14:47 被阅读0次

    先看一个例子:

    /*
      首先我们定义了一个Search对象,带有一个String类型的参数
      该类拥有三个成员方法:
      1)isMatch:判断参数字符串s是否包含子串query
      2)getMatchRdd1:使用isMatch方法获取匹配结果后的RDD
      3)getMatchRdd1:在filter中实现方法获取匹配结果后的RDD
     */
    
    class Search(query: String) {
      def isMatch(s: String): Boolean = {
        s.contains(query)
      }
    
      def getMatchRdd1(rdd: RDD[String]): RDD[String] = {
        rdd.filter(isMatch)
      }
    
      def getMatchRdd2(rdd: RDD[String]): RDD[String] = {
        rdd.filter(_.contains(query))
      }
    
    }
    
    object SerializableDemo {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setMaster("local[4]").setAppName("SerializableDemo")
        val sc = new SparkContext(conf)
    
        val rdd = sc.parallelize(Array("hello", "world", "hello", "spark"))
    
        val search = new Search("h")
    
        val matchRdd = search.getMatchRdd2(rdd)
        matchRdd.collect().foreach(println)
    
        sc.stop()
    
      }
    }
    

    运行后结果:

    Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner.ensureSerializable(ClosureCleaner.scala:345) at org.apache.spark.util.ClosureCleaner.orgapachesparkutilClosureCleanerclean(ClosureCleaner.scala:335) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) at org.apache.spark.SparkContext.clean(SparkContext.scala:2299) at org.apache.spark.rdd.RDDanonfunfilter1.apply(RDD.scala:388)
    at org.apache.spark.rdd.RDD$$anonfunfilter1.apply(RDD.scala:387)
    at org.apache.spark.rdd.RDDOperationScope.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.filter(RDD.scala:387)
    at adamlee.spark.Search.getMatchRdd2(SerializableDemo.scala:34)
    at adamlee.spark.SerializableDemo$.main(SerializableDemo.scala:16)
    at adamlee.spark.SerializableDemo.main(SerializableDemo.scala)
    Caused by: java.io.NotSerializableException: adamlee.spark.Search
    Serialization stack:

    • object not serializable (class: adamlee.spark.Search, value: adamlee.spark.Search@4cafa9aa)
    • field (class: adamlee.spark.Search$$anonfungetMatchRdd21, name: $outer, type: class adamlee.spark.Search)
    • object (class adamlee.spark.Search$$anonfungetMatchRdd21, <function1>)
      at org.apache.spark.serializer.SerializationDebugger.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner.ensureSerializable(ClosureCleaner.scala:342)
      ... 12 more

    报错提示Task未能序列化,再看Caused By提示:object not serializable,告诉我们Search这个类的对象未能序列化。

    原因就是search对象初始化是在Driver端进行的,当我们执行collect是,触发计算,Driver需要将任务下发至Executor,这时候就产生了进程间通信,Driver和Executor间通信是通过网络传输,网络上传输的是二进制的比特流,由于Search类并未继承Serializable类,所以这个类的对象就不能被序列化。

    现在我们新建一个类Search1,继承了Serializable:

    class Search1(query: String) extends Serializable {
      def isMatch(s: String): Boolean = {
        s.contains(query)
      }
    
      def getMatchRdd1(rdd: RDD[String]): RDD[String] = {
        rdd.filter(isMatch)
      }
    
      def getMatchRdd2(rdd: RDD[String]): RDD[String] = {
        rdd.filter(_.contains(query))
      }
    
    }
    
    object SerializableDemo {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setMaster("local[4]").setAppName("SerializableDemo")
        val sc = new SparkContext(conf)
    
        val rdd = sc.parallelize(Array("hello", "world", "hello", "spark"))
    
        val search1 = new Search1("h")
    
        val matchRdd = search1.getMatchRdd2(rdd)
        matchRdd.collect().foreach(println)
    
        sc.stop()
    
      }
    }
    

    运行后结果:

    hello
    hello

    相关文章

      网友评论

          本文标题:Spark-task执行过程中的序列化

          本文链接:https://www.haomeiwen.com/subject/esigqktx.html