美文网首页大数据玩转大数据大数据分析
Spark SerializedLambda错误解决方案

Spark SerializedLambda错误解决方案

作者: 大猪大猪 | 来源:发表于2018-08-06 15:25 被阅读15次

    IDEA下开发Spark程序会遇到Lambda异常,下面演示异常及解决方案。

    例子

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    
    public class SimpleApp {
        public static void main(String[] args) {
            String logFile = "/soft/dounine/github/spark-learn/README.md"; // Should be some file on your system
            SparkConf sparkConf = new SparkConf()
                    .setMaster("spark://localhost:7077")
                    .setAppName("Demo");
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
            JavaRDD<String> logData = sc.textFile(logFile).cache();
    
            long numAs = logData.filter(s -> s.contains("a")).count();
            long numBs = logData.map(new Function<String, Integer>() {
                @Override
                public Integer call(String v1) throws Exception {
                    return 1;
                }
            }).reduce((a,b)->a+b);
    
            System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
    
            sc.stop();
        }
    }
    

    由于使用jdk1.8的lambda表达式,会有如下异常

    18/08/06 15:18:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.0.107, executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2290)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    
    18/08/06 15:18:41 INFO TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1) on 192.168.0.107, executor 0: java.lang.ClassCastException (cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1) [duplicate 1]
    18/08/06 15:18:41 INFO TaskSetManager: Starting task 1.1 in stage 0.0 (TID 2, 192.168.0.107, executor 0, partition 1, PROCESS_LOCAL, 7898 bytes)
    18/08/06 15:18:41 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 3, 192.168.0.107, executor 0, partition 0, PROCESS_LOCAL, 7898 bytes)
    18/08/06 15:18:41 INFO TaskSetManager: Lost task 1.1 in stage 0.0 (TID 2) on 192.168.0.107, executor 0: java.lang.ClassCastException (cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1) [duplicate 2]
    18/08/06 15:18:41 INFO TaskSetManager: Starting task 1.2 in stage 0.0 (TID 4, 192.168.0.107, executor 0, partition 1, PROCESS_LOCAL, 7898 bytes)
    18/08/06 15:18:41 INFO TaskSetManager: Lost task 0.1 in stage 0.0 (TID 3) on 192.168.0.107, executor 0: java.lang.ClassCastException (cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1) [duplicate 3]
    18/08/06 15:18:41 INFO TaskSetManager: Starting task 0.2 in stage 0.0 (TID 5, 192.168.0.107, executor 0, partition 0, PROCESS_LOCAL, 7898 bytes)
    18/08/06 15:18:41 INFO TaskSetManager: Lost task 0.2 in stage 0.0 (TID 5) on 192.168.0.107, executor 0: java.lang.ClassCastException (cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1) [duplicate 4]
    18/08/06 15:18:41 INFO TaskSetManager: Starting task 0.3 in stage 0.0 (TID 6, 192.168.0.107, executor 0, partition 0, PROCESS_LOCAL, 7898 bytes)
    18/08/06 15:18:41 INFO TaskSetManager: Lost task 1.2 in stage 0.0 (TID 4) on 192.168.0.107, executor 0: java.lang.ClassCastException (cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1) [duplicate 5]
    18/08/06 15:18:41 INFO TaskSetManager: Starting task 1.3 in stage 0.0 (TID 7, 192.168.0.107, executor 0, partition 1, PROCESS_LOCAL, 7898 bytes)
    18/08/06 15:18:41 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 6) on 192.168.0.107, executor 0: java.lang.ClassCastException (cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1) [duplicate 6]
    18/08/06 15:18:41 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
    18/08/06 15:18:41 INFO TaskSetManager: Lost task 1.3 in stage 0.0 (TID 7) on 192.168.0.107, executor 0: java.lang.ClassCastException (cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1) [duplicate 7]
    18/08/06 15:18:41 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    18/08/06 15:18:41 INFO TaskSchedulerImpl: Cancelling stage 0
    18/08/06 15:18:41 INFO DAGScheduler: ResultStage 0 (count at SimpleApp.java:19) failed in 1.113 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 192.168.0.107, executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2290)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    
    Driver stacktrace:
    18/08/06 15:18:41 INFO DAGScheduler: Job 0 failed: count at SimpleApp.java:19, took 1.138497 s
    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 192.168.0.107, executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2290)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    
    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
        at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:455)
        at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:45)
        at com.dounine.spark.learn.SimpleApp.main(SimpleApp.java:19)
    Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2290)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2208)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    18/08/06 15:18:41 INFO SparkContext: Invoking stop() from shutdown hook
    18/08/06 15:18:41 INFO SparkUI: Stopped Spark web UI at http://lake.dounine.com:4040
    18/08/06 15:18:41 INFO StandaloneSchedulerBackend: Shutting down all executors
    18/08/06 15:18:41 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
    18/08/06 15:18:41 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    18/08/06 15:18:41 INFO MemoryStore: MemoryStore cleared
    18/08/06 15:18:41 INFO BlockManager: BlockManager stopped
    18/08/06 15:18:41 INFO BlockManagerMaster: BlockManagerMaster stopped
    18/08/06 15:18:41 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    18/08/06 15:18:41 INFO SparkContext: Successfully stopped SparkContext
    18/08/06 15:18:41 INFO ShutdownHookManager: Shutdown hook called
    18/08/06 15:18:41 INFO ShutdownHookManager: Deleting directory /tmp/spark-cf16df6e-fd04-4d17-8b6a-a6252793d0d5
    

    是因为jar包没有分发到Worker中。

    解决方案(一)

    添加Jar包位置路径

    SparkConf sparkConf = new SparkConf()
                    .setMaster("spark://lake.dounine.com:7077")
                    .setJars(new String[]{"/soft/dounine/github/spark-learn/build/libs/spark-learn-1.0-SNAPSHOT.jar"})
                    .setAppName("Demo");
    

    解决方案(二)

    使用本地开发模式

    SparkConf sparkConf = new SparkConf()
                    .setMaster("local")
                    .setAppName("Demo");
    

    相关文章

      网友评论

        本文标题:Spark SerializedLambda错误解决方案

        本文链接:https://www.haomeiwen.com/subject/lshgvftx.html