美文网首页大数据&云计算数据库
实战课堂 | 手把手教你用MongoDB Spark Conne

实战课堂 | 手把手教你用MongoDB Spark Conne

作者: 阿里云数据库 | 来源:发表于2019-10-30 10:25 被阅读0次

    Why Spark with MongoDB?

    1. 高性能,官方号称 100x faster,因为可以全内存运行,性能提升肯定是很明显的
    2. 简单易用,支持 Java、Python、Scala、SQL 等多种语言,使得构建分析应用非常简单
    3. 统一构建 ,支持多种数据源,通过 Spark RDD 屏蔽底层数据差异,同一个分析应用可运行于不同的数据源;
    4. 应用场景广泛,能同时支持批处理以及流式处理

    MongoDB Spark Connector 为官方推出,用于适配 Spark 操作 MongoDB 数据;本文以 Python 为例,介绍 MongoDB Spark Connector 的使用,帮助你基于 MongoDB 构建第一个分析应用。

    准备 MongoDB 环境

    安装 MongoDB 参考 Install MongoDB Community Edition on Linux
    (https://docs.mongodb.com/manual/administration/install-on-linux)

    <pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">mkdir mongodata
    mongod --dbpath mongodata --port 9555
    </pre>
    

    准备 Spark python 环境

    参考 PySpark - Quick Guide
    (https://www.tutorialspoint.com/pyspark/pyspark_quick_guide.htm)
    下载 Spark

    <pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">cd /home/mongo-spark
    wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
    tar zxvf spark-2.4.4-bin-hadoop2.7.tgz
    </pre>
    

    设置 Spark 环境变量

    <pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">export SPARK_HOME=/home/mongo-spark/spark-2.4.4-bin-hadoop2.7
    export PATH=$PATH:/home/mongo-spark/spark-2.4.4-bin-hadoop2.7/bin
    export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
    export PATH=$SPARK_HOME/python:$PATH
    </pre>
    

    运行 Spark RDD 示例

    <pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;"># count.py
    from pyspark import SparkContext
    sc = SparkContext("local", "count app")
    words = sc.parallelize (
     ["scala", 
     "java", 
     "hadoop", 
     "spark", 
     "akka",
     "spark vs hadoop", 
     "pyspark",
     "pyspark and spark"]
    )
    counts = words.count()
    $SPARK_HOME/bin/spark-submit count.py
    Number of elements in RDD → 8 
    </pre>
    

    如果上述程序运行成功,说明 Spark python 环境准备成功,还可以测试 Spark 的其他 RDD 操作,比如 collector、filter、map、reduce、join 等,更多示例参考 PySpark - Quick Guide
    (https://www.tutorialspoint.com/pyspark/pyspark_quick_guide.htm)

    Spark 操作 MongoDB 数据

    参考 Spark Connector Python Guide
    (https://docs.mongodb.com/spark-connector/master/python-api)
    准备测试数据 test.coll01 插入3条测试数据,test.coll02 未空

    <pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">mongo --port 9555
    > db.coll01.find()
    { "_id" : 1, "type" : "apple", "qty" : 5 }
    { "_id" : 2, "type" : "orange", "qty" : 10 }
    { "_id" : 3, "type" : "banana", "qty" : 15 }
    > db.coll02.find()
    </pre>
    

    准备操作脚本,将输入集合的数据按条件进行过滤,写到输出集合

    <pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;"># mongo-spark-test.py
    from pyspark.sql import SparkSession
    # Create Spark Session
    spark = SparkSession \
     .builder \
     .appName("myApp") \
     .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:9555/test.coll01") \
     .config("spark.mongodb.output.uri", "mongodb://127.0.0.1:9555/test.coll") \
     .getOrCreate()
    # Read from MongoDB
    df = spark.read.format("mongo").load()
    df.show()
    # Filter and Write
    df.filter(df['qty'] >= 10).write.format("mongo").mode("append").save() 
    # Use SQL 
    # df.createOrReplaceTempView("temp")
    # some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
    # some_fruit.show()
    </pre>
    

    运行脚本

    <pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
    $SPARK_HOME/bin/spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 mongo-spark-test.py
    mongo --port 9555
    > db.coll02.find()
    { "_id" : 2, "qty" : 10, "type" : "orange" }
    { "_id" : 3, "qty" : 15, "type" : "banana" }</pre>
    

    相关文章

      网友评论

        本文标题:实战课堂 | 手把手教你用MongoDB Spark Conne

        本文链接:https://www.haomeiwen.com/subject/gjfovctx.html