美文网首页
Mongo联合Spark

Mongo联合Spark

作者: 不愿透露姓名的李某某 | 来源:发表于2019-08-17 11:44 被阅读0次

    1.SparkRDD方式

    package cn.edu360.day2

    import com.mongodb.spark.MongoSpark

    import com.mongodb.spark.rdd.MongoRDD

    import org.apache.spark.{SparkConf, SparkContext}

    import org.bson.Document

    /**

    * Created by zx on 2017/10/8.

    * https://docs.mongodb.com/spark-connector/current/

    * https://docs.mongodb.com/spark-connector/current/scala-api/

    */

    object MongoSparkRDD {

    def main(args: Array[String]): Unit = {

    val conf =new SparkConf()

    .setAppName("MongoSparkRDD")

    .setMaster("local[*]")

    .set("spark.mongodb.input.uri","mongodb://xiaoniu:123568@192.168.137.20:27017/bike.logs")

    .set("spark.mongodb.output.uri","mongodb://xiaoniu:123568@192.168.137.20:27017/bike.reslut")

    //创建sparkcontext(RDD,SparkCore)

        val sc =new SparkContext(conf)

    val docsRDD: MongoRDD[Document] = MongoSpark.load(sc)

    //    val filtered: RDD[Document] = docsRDD.filter(doc => {

    //      val age = doc.get("age")

    //      if (age == null) {

    //        false

    //      } else {

    //        val ageDouble = age.asInstanceOf[Double]

    //        ageDouble >= 31

    //      }

    //    })

    //先过滤,filteredRDD,缓存(cache)

        val pv = docsRDD.count()

    val uv = docsRDD.map(doc => {

    doc.getString("openid")

    }).distinct().count()

    println("pv: " + pv +" uv: " + uv)

    //val r = docsRDD.collect()

    //println(r.toBuffer)

    //val filtered = docsRDD.withPipeline(Seq(Document.parse("{ $match: { age : { $gt : 31 } } }")))

    //println(filtered.collect().toBuffer)

    //val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{age: $i}")))

    //val r = filtered.collect()

    //println(r.toBuffer)

    //将计算好的结果保存到mongo中

    //MongoSpark.save(filtered)

        sc.stop()

    //    val spark = SparkSession.builder()

    //      .master("local")

    //      .appName("MongoSparkConnectorIntro")

    //      .config("spark.mongodb.input.uri", "mongodb://192.168.1.13:27200/niu.bi")

    //      //.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")

    //      .getOrCreate()

    //

    //    val df: DataFrame = MongoSpark.load(spark)

    //

    //    df.show()

    SparkSQl方式

    package cn.edu360.day2

    import com.mongodb.spark.MongoSpark

    import org.apache.spark.sql.{DataFrame, SparkSession}

    /**

    * Created by zx on 2017/10/8.

    * https://docs.mongodb.com/spark-connector/current/scala/datasets-and-sql/

    */

    object MongoSparkSQL {

    def main(args: Array[String]): Unit = {

    val session = SparkSession.builder()

    .master("local")

    .appName("MongoSparkConnectorIntro")

    .config("spark.mongodb.input.uri","mongodb://xiaoniu:123568@192.168.137.20:27017/bike.logs")

    .config("spark.mongodb.output.uri","mongodb://xiaoniu:123568@192.168.137.20:27017/bike.reslut")

    .getOrCreate()

    val df:DataFrame = MongoSpark.load(session)

    df.createTempView("v_logs")

    //val result:DataFrame = session.sql("SELECT age, name FROM v_student WHERE age >= 30 ORDER BY age DESC")

    //val result = session.sql("SELECT age, name FROM v_student WHERE age is null")

    //val pv = session.sql("select count(*) from v_logs")

        val uv = session.sql("select count(*) pv, count(distinct openid) uv from v_logs")

    //pv.show()

    //uv.show()

        MongoSpark.save(uv)

    //MongoSpark

        session.stop()

    }

    }

      }

    }

    相关文章

      网友评论

          本文标题:Mongo联合Spark

          本文链接:https://www.haomeiwen.com/subject/cpvhsctx.html