美文网首页spark
Spark 保存数据到MongoDB

Spark 保存数据到MongoDB

作者: 灬臣独秀灬 | 来源:发表于2020-03-12 15:11 被阅读0次

    背景交代

    由于我们应用系统使用的是mongo,所以每次操作结果都要输出到MongoDB方便使用 。

    思路

    1、遇到这样的情景我第一时间打开Spark官网 Mongo数据源
    2、根据教程添加好依赖以后,开始配置链接信息。
    3、根据教程创建Mongo的配置文件发现不少坑。

    采坑
    坑一
    image.png

    官方提供的demo都是不需要登录凭证的。

    坑二
    image.png

    1、找到配置用户密码的缺发现在0.12.* 版本中 MongodbCredentials 类已经移动包名了,直接拷贝文档会报错。
    2、MongodbConfigBuilder 0.12.* 。MongodbConfigBuilder(map:Map,list:List)更本没有这个构造函数。
    3、源码配置文件没有注释,对于英语不好的我有点D疼。

    最终在查看和调试源码下找到关键字段
    image.png

    1、上面代码大致意思以 MongodbConfig.Credentials 为key 读取config中的属性并转换成[List[MongodbCredentials]] 对象,如果为空则获取默认的配置。最后map 成MongoCredential 对象。 从这里可以看出我们只需要配置MongodbConfig.Credentials 中配置用户密码就好了。

    配置代码
    package cn.harsons.mbd.util
    
    import cn.harsons.mbd.util.Config._
    import com.stratio.datasource.mongodb._
    import com.stratio.datasource.mongodb.config.MongodbConfig._
    import com.stratio.datasource.mongodb.config.{MongodbConfigBuilder, MongodbCredentials}
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    
    /**
      * mongoDB操作工具
      *
      * @author liyabin
      * @date 2020/3/12 0012
      */
    object MongoUtils {
      /**
        * 使用 mongoDB 集成方式 写入mongo
        *
        * @param collectionName 集合名称
        * @param data           数据集
        * @param sparkSession   spark 对象
        */
      def saveToMongo(collectionName: String, data: DataFrame, sparkSession: SparkSession): Unit = {
        // 已经引入配置对象,可以直接使用配置对象的属性
        // Credentials 认证 需要用户名和密码
        val saveConfig = MongodbConfigBuilder(
          Map(Host -> List(mongo_host),
            Database -> mongo_database,
            Collection -> collectionName,
            SamplingRatio -> 1.0,
            WriteConcern -> "normal",
            SplitSize -> 8,
            SplitKey -> "_id",
            Credentials -> List(MongodbCredentials(mongo_user, mongo_authentication, mongo_password.toCharArray)))
        )
        data.saveToMongodb(saveConfig.build())
      }
    
      /**
        * 使用 Spark 原生的方式 写入mongo
        *
        * @param collectionName 集合名称
        * @param data           数据集
        * @param mode           数据写入模式 (覆盖、追加等)
        * @param sparkSession   sparkSession 对象
        */
      def writeToMongo(collectionName: String, data: DataFrame, mode: SaveMode, sparkSession: SparkSession): Unit = {
        // Credentials 官网上要求 如果是String 类型 使用这种方式 配置用户凭证  user,authDataBase,password
        val options = Map("host" -> mongo_host,
          Credentials -> (mongo_user + "," + mongo_authentication + "," + mongo_password),
          "database" -> mongo_database,
          "collection" -> collectionName)
        data.write.format("com.stratio.datasource.mongodb").mode(mode)
          .options(options)
          .save()
      }
    
    }
    
    

    相关文章

      网友评论

        本文标题:Spark 保存数据到MongoDB

        本文链接:https://www.haomeiwen.com/subject/rpnujhtx.html