美文网首页
碰壁Spark + Mongodb

碰壁Spark + Mongodb

作者: 或然子 | 来源:发表于2017-05-29 16:28 被阅读0次

    在尝试Spark + MongoDB过程中,总是遇到Cursor xxxxx not found错误, 尝试加入keep_alive_ms 和 pipeline 也不能解决问题。

    目前总数据量在10000条左右,从Mongodb中加载后交给Spark的NaiveBayes training.

        pipeline = {{ $limit: 5000 },{ $skip: 2000 }}
        has_train = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
            .option("spark.mongodb.input.uri", "mongodb://mongo_and_spark_server:27017/resume_db.has_train") \
            .option("spark.mongodb.keep_alive_ms", "3600000") \
            .option("pipeline", pipeline) \
            .load()
    

    在1.6之前,我们需要手动部署并指明额外加载第三方jar文件路径,在实验2.1的时候,这些Package会自动下载

    ./spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
        --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 \
        --py-files ./utility.py \
        --files ./dicts/statistic_college.txt \
        --files ./dicts/degrees.txt \
        --files ./dicts/diming.txt \
        --files ./dicts/subjects.txt \
        --files ./dicts/training_org.txt \
        naive_bayes.py
    

    Output:

    # ./submit.sh 
    Ivy Default Cache set to: /root/.ivy2/cache
    The jars for the packages stored in: /root/.ivy2/jars
    :: loading settings :: url = jar:file:/home/pluto/spark/spark-2.1.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    org.mongodb.spark#mongo-spark-connector_2.11 added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
        confs: [default]
        found org.mongodb.spark#mongo-spark-connector_2.11;2.0.0 in central
        found org.mongodb#mongo-java-driver;3.2.2 in central
    :: resolution report :: resolve 221ms :: artifacts dl 4ms
        :: modules in use:
        org.mongodb#mongo-java-driver;3.2.2 from central in [default]
        org.mongodb.spark#mongo-spark-connector_2.11;2.0.0 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
        ---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent
        confs: [default]
        0 artifacts copied, 2 already retrieved (0kB/7ms)
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    

    当数据量增大时,总是遇到如下错误,目前还没有排查出错误根源 :(

    17/05/29 01:23:16 INFO MongoClientCache: Closing MongoClient: [mongo_and_spark_server:27017]
    17/05/29 01:23:16 INFO connection: Closed connection [connectionId{localValue:2, serverValue:42}] to mongo_and_spark_server:27017 because the pool has been closed.
    17/05/29 01:23:16 INFO MongoClientCache: Closing MongoClient: [mongo_and_spark_server:27017]
    17/05/29 01:23:16 INFO connection: Closed connection [connectionId{localValue:4, serverValue:46}] to mongo_and_spark_server:27017 because the pool has been closed.
    17/05/29 01:27:56 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 5, mongo_and_spark_server, executor 2): com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 34611963569 not found on server mongo_and_spark_server:27017' on server mongo_and_spark_server:27017
        at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)
        at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:215)
        at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:103)
        at com.mongodb.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:46)
        at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
        at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
        at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
    
    17/05/29 01:27:56 INFO TaskSetManager: Starting task 3.1 in stage 2.0 (TID 6, mongo_and_spark_server, executor 2, partition 3, ANY, 6787 bytes)
    17/05/29 01:29:01 WARN TaskSetManager: Lost task 2.0 in stage 2.0 (TID 4, mongo_and_spark_server, executor 0): com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 34615739977 not found on server mongo_and_spark_server:27017' on server mongo_and_spark_server:27017
        at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)
        at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:215)
        at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:103)
        at com.mongodb.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:46)
        at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
        at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
        at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
    
    17/05/29 01:29:01 INFO TaskSetManager: Starting task 2.1 in stage 2.0 (TID 7, mongo_and_spark_server, executor 2, partition 2, ANY, 6799 bytes)
    

    参考:

    https://docs.mongodb.com/spark-connector/v2.0/configuration/#spark-input-conf
    https://docs.mongodb.com/manual/core/aggregation-pipeline-optimization/
    http://www.mongoing.com/tj/mongodb_shanghai_spark

    相关文章

      网友评论

          本文标题:碰壁Spark + Mongodb

          本文链接:https://www.haomeiwen.com/subject/kozkfxtx.html