美文网首页
spark-mongodb简单上手

spark-mongodb简单上手

作者: Josen_Qu | 来源:发表于2018-06-15 14:02 被阅读45次

    Spark提供的所有计算,不管是批处理,Spark SQL,Spark Streaming还是Spark ML,它们底层都是通过RDD计算。所以这里就以RDD方式简单上手。首先认识一下RDD:RDD(Resilient Distributed Dataset)是Spark最基础核心的概念,它表示可分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD可以缓存到内存或磁盘中,每次对RDD数据集的操作之后的结果可以复用,省去了MapReduce大量的磁盘IO操作。这对于迭代运算频繁的不同维度的批处理、机器学习算法、交互式数据挖掘来说,效率提高很多。
    方便起见,这里Spark使用的本地单机模式,需要在本地安装Spark及配置环境变量,由于Spark是Scala写的,也要安装Scala及环境变量的配置,注意对应的Scala版本要匹配。代码使用Maven构建工程。项目中,数据库使用Mongdb,编程语言Java,业务意义就是统计今年申报出口业务中每家企业的报关单量。

    pom文件部分:

      <dependency>
    
      <groupId>org.apache.spark</groupId>
    
      <artifactId>spark-core_2.11</artifactId>
    
      <version>2.2.0</version>
    
      </dependency>
    
      <dependency>
    
      <groupId>org.mongodb.spark</groupId>
    
      <artifactId><u>mongo</u>-spark-connector_2.11</artifactId>
    
      <version>2.2.1</version>
    
      </dependency>
    
      <dependency>
    
      <groupId>org.apache.spark</groupId>
    
      <artifactId>spark-sql_2.11</artifactId>
    
      <version>2.2.0</version>
    
      </dependency>
    

    代码部分:

            // 创建 一个 spark session,可以配置输入源,输出源,spark运行模式,spark应用实例名称等
            SparkSession spark = SparkSession.builder().master("local").appName("MongodbSparkDemoTest")
                    .config("spark.mongodb.input.uri",
                            "mongodb://user:password@localhost:port/db.collection")
                    .config("spark.mongodb.output.uri",
                            "mongodb://user:password@localhost:port/db.collection2")
                    .getOrCreate();
    
            // 创建一个javaspark 上下文
            JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
            // 使用mongodb 聚合管道获取数据 生成一个RDD
            JavaMongoRDD<Document> aggregatedRdd = MongoSpark.load(jsc).withPipeline(Collections.singletonList(Aggregates
                    .match(Filters.and(Filters.eq("iEMark", "E"), Filters.gte("declareDate", "20180101000000")))));
            // 原始数据的样子
            System.out.println("原始数据的样子");
            aggregatedRdd.take(5).forEach((t) -> {
                System.out.println(t);
            });
            ;
            // map 将一条记录映射成 key - value 的形式,key,value都可以是多个字段
            JavaPairRDD<String, Long> mapRdd = aggregatedRdd.mapPartitionsToPair((Iterator<Document> t) -> {
                List<Tuple2<String, Long>> list = new ArrayList<>();
                while (t.hasNext()) {
                    Document doc = t.next();
                    list.add(new Tuple2<String, Long>(doc.getString("tradeCode"), (long) 1));
                }
                return list.iterator();
            });
            // map后的样子
            System.out.println("map后的样子");
            mapRdd.take(5).forEach((t) -> {
                System.out.println(t);
            });
            // reduce 将相同key的value聚合
            JavaPairRDD<String, Long> reducedRdd = mapRdd.reduceByKey((Long v1, Long v2) -> {
                return v1 + v2;
            });
            // reduce 后的样子
            System.out.println("reduce后的样子,企业编码-报关单量");
            reducedRdd.take(5).forEach((t) -> {
                System.out.println(t);
            });
            // 转换成mongodb文档格式
            JavaRDD<Document> reducedDocRdd = reducedRdd.map((Tuple2<String, Long> v1) -> {
                return new Document().append("tradeCode", v1._1).append("entryQty", v1._2);
            });
            // 统计结果保存到mongodb
            MongoSpark.save(reducedDocRdd);
    
            jsc.close();
    

    Console输出:


    图片.png

    相关文章

      网友评论

          本文标题:spark-mongodb简单上手

          本文链接:https://www.haomeiwen.com/subject/xbfmeftx.html