背景交代
由于我们应用系统使用的是mongo,所以每次操作结果都要输出到MongoDB方便使用 。
思路
1、遇到这样的情景我第一时间打开Spark官网 Mongo数据源
2、根据教程添加好依赖以后,开始配置链接信息。
3、根据教程创建Mongo的配置文件发现不少坑。
采坑
坑一
image.png官方提供的demo都是不需要登录凭证的。
坑二
image.png1、找到配置用户密码的缺发现在0.12.* 版本中 MongodbCredentials 类已经移动包名了,直接拷贝文档会报错。
2、MongodbConfigBuilder 0.12.* 。MongodbConfigBuilder(map:Map,list:List)更本没有这个构造函数。
3、源码配置文件没有注释,对于英语不好的我有点D疼。
最终在查看和调试源码下找到关键字段
image.png1、上面代码大致意思以 MongodbConfig.Credentials 为key 读取config中的属性并转换成[List[MongodbCredentials]] 对象,如果为空则获取默认的配置。最后map 成MongoCredential 对象。 从这里可以看出我们只需要配置MongodbConfig.Credentials 中配置用户密码就好了。
配置代码
package cn.harsons.mbd.util
import cn.harsons.mbd.util.Config._
import com.stratio.datasource.mongodb._
import com.stratio.datasource.mongodb.config.MongodbConfig._
import com.stratio.datasource.mongodb.config.{MongodbConfigBuilder, MongodbCredentials}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* mongoDB操作工具
*
* @author liyabin
* @date 2020/3/12 0012
*/
object MongoUtils {
/**
* 使用 mongoDB 集成方式 写入mongo
*
* @param collectionName 集合名称
* @param data 数据集
* @param sparkSession spark 对象
*/
def saveToMongo(collectionName: String, data: DataFrame, sparkSession: SparkSession): Unit = {
// 已经引入配置对象,可以直接使用配置对象的属性
// Credentials 认证 需要用户名和密码
val saveConfig = MongodbConfigBuilder(
Map(Host -> List(mongo_host),
Database -> mongo_database,
Collection -> collectionName,
SamplingRatio -> 1.0,
WriteConcern -> "normal",
SplitSize -> 8,
SplitKey -> "_id",
Credentials -> List(MongodbCredentials(mongo_user, mongo_authentication, mongo_password.toCharArray)))
)
data.saveToMongodb(saveConfig.build())
}
/**
* 使用 Spark 原生的方式 写入mongo
*
* @param collectionName 集合名称
* @param data 数据集
* @param mode 数据写入模式 (覆盖、追加等)
* @param sparkSession sparkSession 对象
*/
def writeToMongo(collectionName: String, data: DataFrame, mode: SaveMode, sparkSession: SparkSession): Unit = {
// Credentials 官网上要求 如果是String 类型 使用这种方式 配置用户凭证 user,authDataBase,password
val options = Map("host" -> mongo_host,
Credentials -> (mongo_user + "," + mongo_authentication + "," + mongo_password),
"database" -> mongo_database,
"collection" -> collectionName)
data.write.format("com.stratio.datasource.mongodb").mode(mode)
.options(options)
.save()
}
}
网友评论