美文网首页
如何通过Spark快速加载MongoDB数据

如何通过Spark快速加载MongoDB数据

作者: 郭彦超 | 来源:发表于2019-08-15 10:45 被阅读0次

    我们的场景内容和表单数据都存储在mongo里面,现需要将这些数据同步到数据仓库用于后期分析,数据体量很大 如何快速同步是个问题?

    1、登陆mongo查看索引及索引字段类型

    {
            "v" : 1,
            "key" : {
                "sceneId" : 1
            },
            "name" : "sceneId",
            "ns" : "eqs_scene.#",
            "sparse" : false,
            "background" : false
    }
    
    "sceneId" : NumberLong(8973702)
    

    这两个操作非常总要,如果没有索引需提前创建,我们接下来会借助spark条件下推的方式拉取数据,这种方式比在spark 全量load后进行条件过滤快上千倍

    2、同步脚本

    ./bin/spark-shell --master yarn --packages "com.stratio.datasource:spark-mongodb_2.11:0.12.0" --num-executors 10
    
    import org.apache.spark.sql.types._ 
    
    #sceneId的定义要保持和MongoDB中数据类型一致,这里使用long类型
    val schemaMongo = new StructType().add("elementsJson",StringType) .add("sceneId", LongType)
    #spark自动识别scheme会先load全量数据,会执行很长的时间,这里使用提前定义好的scheme  
    val df = spark.read.schema(schemaMongo).format("com.stratio.datasource.mongodb").options(Map("host" -> "#:27010", "database" -> "#", "collection" -> "#", "credentials"->"#,#,#")).load
    
    df.createOrReplaceTempView("t1")
    
    val df2 = sql("select  cast(t.id as long) from   eqxdb.eqs_scene t  where publish_date='2018-12-21'")
    
    val ids = df2.map(_.getLong(0)).collectAsList()
    #获取需要下推的Id列表 并转化为scala序列
    import scala.collection.JavaConverters
    #注意数组中的对象类型应为long类型 和scheme定义保持一致 :Seq[Long]
    val sid=JavaConverters.asScalaIteratorConverter(ids.iterator()).asScala.toSeq
    
    #查询下推至mongo 数据秒出
    df.where(df("sceneId").isin(sid:_*)).show
    
    

    3、总结

    1、如果下推数据量很少,但执行任务长期卡主不动的话,需要查看dataframe中的索引字段名称和类型是否与mongo库中的一致
    2、credentials为认证,需依次提供3个参数:用户名、数据库名、密码
    3、构建df时需提前查看mongo库的数据结构来定义schema,spark反射出的数据结构会有问题且整个过程很慢

    相关文章

      网友评论

          本文标题:如何通过Spark快速加载MongoDB数据

          本文链接:https://www.haomeiwen.com/subject/fceajctx.html