Apache Spark连接MongoDB

作者: Michaelhbjian | 来源:发表于2018-01-18 20:19 被阅读746次

    大数据处理引擎Apache Spark与Mongodb相结合,构建一个复杂的实时分析系统。通过spark-mongodb连接器可以将spark与mongodb数据库连接起来。

    image.png

    1.前提

    • 安装并运行Mongodb
    • Spark 2.1
    • Scala 2.11

    2.安装MongoDB(通过yum安装)

    MongoDB安装教程

    2.1配置yum源

    vim /etc/yum.repos.d/mongodb-org-3.4.repo
    

    添加以下内容:

    [mongodb-org-3.4]
    name=MongoDB Repository
    baseurl=https://repo.mongodb.org/yum/redhat/$releasever/mongodb-org/3.4/x86_64/
    gpgcheck=1
    enabled=1
    gpgkey=https://www.mongodb.org/static/pgp/server-3.4.asc
    

    2.2安装

    yum install -y mongodb-org
    

    安装过程中出现验证错误的情况,则取消gpgcheck的验证。修改mongodb-org-3.4.repo文件后重新安装:

    //取消验证
    gpgcheck=0
    

    2.3yum安装后的文件位置

    安装完后配置文件在:/etc/mongod.conf

    数据文件在:/var/lib/mongo

    日志文件在:/var/log/mongodb

    2.4启动服务

    // 启动 
    systemctl start  mongod.service
    // 检查是否启动
    systemctl status  mongod.service
    // 关闭 
    systemctl stop  mongod.service
    // 重启服务
    systemctl restart  mongod.service
    // 设置开机启动
    systemctl enable  mongod.service
    

    2.5访问权限控制

    MongoDB服务默认只绑定在本机IP上,即只有本机才能访问MongoDB,我们可以修改访问权限控制让外网也能访问。

    修改配置文件/etc/mongod.conf将其中的bindip:127.0.0.1注释即可。

    image.png

    2.6进入mongodb命令行

    image.png

    3.运行mongo-spark

    本文使用mongodb官方的MongoDB Connector for Apache Spark,先下载MongoDB Connector for Apache Spark ,MongoDB Connector for Apache Spark的使用方法非常简单,到spark的安装的bin目录下执行下面语句:

    按照官方文档启动Spark Connector Scala Guide

    ./spark-shell  --conf "spark.mongodb.input.uri=mongodb://localhost:27017/test.test?authSource=admin" --conf "spark.mongodb.output.uri=mongodb://localhost:27017/test.test?authSource=admin" --packages org.mongodb.spark:mongo-spark-connector_2.10:2.2.1
    
    //test是数据库名称。myCollection是集合名称
    //读配置
    The spark.mongodb.input.uri specifies the MongoDB server address (127.0.0.1), the database to connect (test), and the collection (myCollection) from which to read data, and the read preference.
    //写配置
    The spark.mongodb.output.uri specifies the MongoDB server address (127.0.0.1), the database to connect (test), and the collection (myCollection) to which to write data. Connects to port 27017 by default.
    //这里mongo-spark的包的要换成对应的版本号
    The packages option specifies the Spark Connector’s Maven coordinates, in the format groupId:artifactId:version.
    
    image.png

    4.运行实例

    导入MongoDB连接包

    import com.mongodb.spark._
    

    在读写mongodb数据库时会自动连接mongodb。

    import org.bson.Document
    

    使用Datasets and SQL,对mongodb数据库进行增删改查。

    import org.apache.spark.sql.SparkSession
    
    val sparkSession = SparkSession.builder().master("local").appName("MongoSparkConnectorIntro").config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.test").config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.test").getOrCreate()
    
    import com.mongodb.spark._
    import org.bson.Document
    val df = MongoSpark.load(sparkSession)
    df.printSchema() 
    

    在Spark使用结束时,务必使用 :quit退出。否则将导致错误。

    5.MongoDB的基本操作

    传统数据库与Mongodb数据库结构区别图

    image.png

    创建数据库

    // 如果数据库不存在则创建,否则切换到指定数据库(当没有进行操作然后离开的时候会自动删除该数据库)
    use DATABASE_NAME  
    db 查看当前数据库名
    show dbs 查看所有数据库
    show tables 查看集合
    

    删除数据库

    db.dropDatabase() 删除当前数据库
    db.collection.drop() 删除集合,collection为集合名,例db.student.drop()
    

    创建集合

    >db.test.insert({"id":"1002", 
        "name": "lisi",
    })
    
    image.png

    test是我们的集合名,如果该集合不在数据库中,Mongodb会自动创建该集合并插入文档。

    //查看已插入的文档
    db.test.find();
    

    删除集合

    db.test.drop();
    
    image.png
    插入文档
    //单条文档插入
    >db.test.insert({"id":10,"name":"zhangsan"})
    //循环批量插入文档
    >for(var i=1;i<=20;i++) {db.users.insert({"id":10,"name":"zhangsan"})}
    
    更新文档

    update()方法

    db.collection.update(
        <query>,  // 查询条件
        <update>,  //update的对象和一些更新的操作符
        {
            upsert: <boolean>,  // 如果不存在update的记录,是否插入objNew,true为插入,默认是false,不插入
            multi: <boolean>,   // mongodb 默认是false,只更新找到的第一条记录,如果这个参数为true,就把按条件查出来多条记录全部更新。
            writeConcern: <document>  // 可选,抛出异常的级别
        }
    )
    
    所有修改器包括:
     $set 对某一个键值进行修改,如果没有就添加 ex:db.person.update({_id:1},{$set:{name:"lwd"}})
     $inc 只使用于数值类型,对其进行加减操作 ex:db.person.update({_id:1},{$inc:{id:1}})
     $unset 删除某指定的键 ex:db.person.update({_id:1},{$unset{id:1}})
     $push 只使用于数组类型,在数组里面加值 ex:db.person.update({_id=1},{$push:{class:"1"}})
     $pushAll 只使用于数组类型,批量在数组价值 ex:db.person.update({_id=1},{$pullAll:class:["1","2","3"]})
     $addToSet 只使用于数组类型,当没有的时候才会进行操作 ex:db.person.update({_id=1},{$addToSet:{name:"lwd"}})
     $pop 只使用于数组,删除某一个元素-1为第一个 1为最后一个 ex:db.person.update({_id=1},{$pop:{name:1}})
     $pull 删除某一个元素 直接写要删除的东西 ex:db.person.update({_id=1},{$pull:{name:"lwd"}})
     $pullAll 一次性删除某元素 ex:db.person.update({_id:1},{$pullAll:{class:["1","2"]}})
     $addToSet与$each批量数组更新 ex: db.person.update({_id:1},{$adaToSet:{class:{$each:["1","2","3"]}}})
    
    

    例子
    db.test.update({"name":"lisi"},{$set:{"name":"wangwu"}})

    以上语句只会修改第一条发现的文档,如果你要修改多条相同的文档,则需要设置multi参数为true

    image.png

    save()方法

    db.collection.save(
        <document>, // 传入文档用来替换之前的文档
        {
            writeConcern: <document>
        }
    )
    
    //实例
    db.test.save(
        {"id":"1003","name":"zhaoliu"}
    )
    
    image.png

    save()方法与insert的区别:

    save()方法可以插入相同的id的数据(id为系统自动生成的那个_id),而insert插入会出错。

    image.png
    删除文档

    remove:删除表中的记录(根据条件删除)

    db.collection.remove(
        <query>,       // 可选,删除的文档的条件
        {
            justOne:<boolean>,   // 可选,如果为true或1,则只删除一个文档
            writeConcern: <document>  // 可选,抛出异常级别
        }
    )
    

    例子
    db.test.remove({"id":"1003"})

    db.test.remove({}) 删除所有数据

    image.png
    查询文档
    db.test.find() 查看已插入的文档
    db.test.find().pretty() 输出好看的格式
    db.test.findOne() 只返回一个文档,一般是返回第一个文档
    指定返回的键:db.文档名.find({条件},{键指定}) 
    
    image.png

    条件操作符

    (>) 大于 – $gt
    (<) 小于 – $lt
    (>=) 大于等于 – $gte
    (<= ) 小于等于 – $lte
    db.test.find({"id" : {$gt : "1002"}}) 查找likes大于100的数据
    
    image.png

    AND条件

    传入多个键值然后逗号隔开
    db.test.find({key1: value1, key2: value2}).pretty()
    db.test.find({"id":"1002","name":"wangwu"})
    
    image.png

    OR条件

     >db.test.find({$or:[{"id":"1002"}, {"name":"jerry"}]}).pretty()
    
    image.png

    limit and skip

    db.test.find().limit(2) 只读取两条
    db.test.find().limit(1).skip(1) 跳过第一条,只显示第二条
    skip默认为0
    
    image.png

    sort()方法

    1升序,-1降序
    db.test.find().sort({"id": 1})
    //第一个括弧中时查询条件,第二个括弧中显示内容:1会显示出来字段,0不会显示字段
    db.test.find({}, {"id": 1, "_id": 0}).sort({"id": -1})
    
    image.png

    参考资料

    https://www.mongodb.com/products/spark-connector
    https://docs.mongodb.com/spark-connector/current/
    https://university.mongodb.com/activate?key=03adb7a09af048bdb68e9703bb11c512&utm_campaign=verification&utm_medium=email&utm_source=university
    https://github.com/mongodb/mongo-spark?jmp=hero
    http://blog.csdn.net/chenguohong88/article/details/77850882
    http://www.36dsj.com/archives/80662

    相关文章

      网友评论

        本文标题:Apache Spark连接MongoDB

        本文链接:https://www.haomeiwen.com/subject/xfawoxtx.html