1.SparkRDD方式
package cn.edu360.day2
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.rdd.MongoRDD
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.Document
/**
* Created by zx on 2017/10/8.
* https://docs.mongodb.com/spark-connector/current/
* https://docs.mongodb.com/spark-connector/current/scala-api/
*/
object MongoSparkRDD {
def main(args: Array[String]): Unit = {
val conf =new SparkConf()
.setAppName("MongoSparkRDD")
.setMaster("local[*]")
.set("spark.mongodb.input.uri","mongodb://xiaoniu:123568@192.168.137.20:27017/bike.logs")
.set("spark.mongodb.output.uri","mongodb://xiaoniu:123568@192.168.137.20:27017/bike.reslut")
//创建sparkcontext(RDD,SparkCore)
val sc =new SparkContext(conf)
val docsRDD: MongoRDD[Document] = MongoSpark.load(sc)
// val filtered: RDD[Document] = docsRDD.filter(doc => {
// val age = doc.get("age")
// if (age == null) {
// false
// } else {
// val ageDouble = age.asInstanceOf[Double]
// ageDouble >= 31
// }
// })
//先过滤,filteredRDD,缓存(cache)
val pv = docsRDD.count()
val uv = docsRDD.map(doc => {
doc.getString("openid")
}).distinct().count()
println("pv: " + pv +" uv: " + uv)
//val r = docsRDD.collect()
//println(r.toBuffer)
//val filtered = docsRDD.withPipeline(Seq(Document.parse("{ $match: { age : { $gt : 31 } } }")))
//println(filtered.collect().toBuffer)
//val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{age: $i}")))
//val r = filtered.collect()
//println(r.toBuffer)
//将计算好的结果保存到mongo中
//MongoSpark.save(filtered)
sc.stop()
// val spark = SparkSession.builder()
// .master("local")
// .appName("MongoSparkConnectorIntro")
// .config("spark.mongodb.input.uri", "mongodb://192.168.1.13:27200/niu.bi")
// //.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
// .getOrCreate()
//
// val df: DataFrame = MongoSpark.load(spark)
//
// df.show()
SparkSQl方式
package cn.edu360.day2
import com.mongodb.spark.MongoSpark
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Created by zx on 2017/10/8.
* https://docs.mongodb.com/spark-connector/current/scala/datasets-and-sql/
*/
object MongoSparkSQL {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri","mongodb://xiaoniu:123568@192.168.137.20:27017/bike.logs")
.config("spark.mongodb.output.uri","mongodb://xiaoniu:123568@192.168.137.20:27017/bike.reslut")
.getOrCreate()
val df:DataFrame = MongoSpark.load(session)
df.createTempView("v_logs")
//val result:DataFrame = session.sql("SELECT age, name FROM v_student WHERE age >= 30 ORDER BY age DESC")
//val result = session.sql("SELECT age, name FROM v_student WHERE age is null")
//val pv = session.sql("select count(*) from v_logs")
val uv = session.sql("select count(*) pv, count(distinct openid) uv from v_logs")
//pv.show()
//uv.show()
MongoSpark.save(uv)
//MongoSpark
session.stop()
}
}
}
}
网友评论