这两天在用Spark查询MongoDB数据。之前别人做了一些功能所以代码拿过来修改下直接用。但是发现个问题,源程序只是获得数据做Map操作,而我要做SQL查询。官方文档的示范程序要用rdd.withPipeline(Seq(matchQuery,projection1))实现查询条件的设置。但是我总是报错。
查了下以为文档有问题,或者是版本不兼容。最后看到官方文档Java语言的说明,提示要用singletonList替代Seq。试了下果然可以,但是复杂的查询条件使用singletonList又不方便。也没有scala对于这个接口的用法。
最后找到Mongo-Spark在Git上的示范程序,运行竟然Seq都正常。对比了一下终于发现了问题。
原来我的程序别人是用Java开发然后利用iDEA自动翻译成Scala了,所以使用的是JavaSparkContext,然后基于它load出的RDD类型是JavaMongoRDD,而这个RDD调用withPipeline时就必须用java提供的singletonList接口,不能用Seq。
正确的用法是跟官方scala的示范程序一样,使用SparkContext为参数,通过val rdd = MongoSpark.load(spark.sparkContext)返回正经的MongoRDD。然后就可以跟官方Scala文档一样去开发了。
总结,还是要学会Scala才好,用IDE自动转换的代码风险还是比较高。
网友评论