我们的场景内容和表单数据都存储在mongo里面,现需要将这些数据同步到数据仓库用于后期分析,数据体量很大 如何快速同步是个问题?
1、登陆mongo查看索引及索引字段类型
{
"v" : 1,
"key" : {
"sceneId" : 1
},
"name" : "sceneId",
"ns" : "eqs_scene.#",
"sparse" : false,
"background" : false
}
"sceneId" : NumberLong(8973702)
这两个操作非常总要,如果没有索引需提前创建,我们接下来会借助spark条件下推的方式拉取数据,这种方式比在spark 全量load后进行条件过滤快上千倍
2、同步脚本
./bin/spark-shell --master yarn --packages "com.stratio.datasource:spark-mongodb_2.11:0.12.0" --num-executors 10
import org.apache.spark.sql.types._
#sceneId的定义要保持和MongoDB中数据类型一致,这里使用long类型
val schemaMongo = new StructType().add("elementsJson",StringType) .add("sceneId", LongType)
#spark自动识别scheme会先load全量数据,会执行很长的时间,这里使用提前定义好的scheme
val df = spark.read.schema(schemaMongo).format("com.stratio.datasource.mongodb").options(Map("host" -> "#:27010", "database" -> "#", "collection" -> "#", "credentials"->"#,#,#")).load
df.createOrReplaceTempView("t1")
val df2 = sql("select cast(t.id as long) from eqxdb.eqs_scene t where publish_date='2018-12-21'")
val ids = df2.map(_.getLong(0)).collectAsList()
#获取需要下推的Id列表 并转化为scala序列
import scala.collection.JavaConverters
#注意数组中的对象类型应为long类型 和scheme定义保持一致 :Seq[Long]
val sid=JavaConverters.asScalaIteratorConverter(ids.iterator()).asScala.toSeq
#查询下推至mongo 数据秒出
df.where(df("sceneId").isin(sid:_*)).show
3、总结
1、如果下推数据量很少,但执行任务长期卡主不动的话,需要查看dataframe中的索引字段名称和类型是否与mongo库中的一致
2、credentials为认证,需依次提供3个参数:用户名、数据库名、密码
3、构建df时需提前查看mongo库的数据结构来定义schema,spark反射出的数据结构会有问题且整个过程很慢
网友评论