MongoDB - 聚合

作者: yuanzicheng | 来源:发表于2018-05-07 13:50 被阅读18次

    聚合操作处理数据记录并返回计算结果。聚合操作组将来自多个文档的值组合在一起,并且可以对分组数据执行各种操作以返回单个结果。MongoDB提供了执行聚合的三种方法:Aggregation PipelineMap-ReduceSingle Purpose Aggregation Operations

    1.Aggregation Pipeline

    MongoDB的聚合管道将MongoDB文档在一个管道处理完毕后将结果传给下一个管道处理。聚合管道是一个强大的工具,类似shell中的管道,并且它能够支持分片集群。

    聚合管道的语法如下,aggregate()的参数为一个管道操作符的数组。

    db.collection.aggregate( [ { <stage> }, ... ] )
    
    1.1 Aggregation Pipeline Stages

    文档按照顺序通过一组聚合管道,聚合管道支持以下操作符。

    1.1.1 $project

    修改输入文档的结构(不影响原数据):引入、去除指定字段

    > db.user.find()
    { "_id" : ObjectId("5ae2d26a219cdd2458b3cac3"), "name" : "jack", "age" : 14 }
    { "_id" : ObjectId("5ae2d26c219cdd2458b3cac4"), "name" : "abc", "age" : 11 }
    { "_id" : ObjectId("5ae2d26d219cdd2458b3cac5"), "name" : "tom", "age" : 12, "tag" : "11岁以上", "r" : [ 5 ] }
    > db.user.aggregate([{$project:{_id:0,name:1,age:1}}])
    { "name" : "jack", "age" : 14 }
    { "name" : "abc", "age" : 11 }
    { "name" : "tom", "age" : 12 }
    
    1.1.2 $match

    过滤数据,输出符合条件的文档。

    > db.user.find({"name":"tom"})
    { "_id" : ObjectId("5ae2d26d219cdd2458b3cac5"), "name" : "tom", "age" : 12, "tag" : "11岁以上", "r" : [ 5 ] }
    > db.user.aggregate([{$project:{_id:0,name:1,age:1}},{$match:{"name":"tom"}}])
    { "name" : "tom", "age" : 12 }
    
    1.1.3 $limit

    限制返回的文档数。

    > db.user.aggregate([{$project:{_id:0,name:1,age:1}},{$limit:1}])
    { "name" : "jack", "age" : 14 }
    
    1.1.4 $skip

    跳过指定数量的文档。

    > db.user.aggregate([{$project:{_id:0,name:1,age:1}},{$skip:2},{$limit:1}])
    { "name" : "tom", "age" : 12 }
    
    1.1.5 $unwind

    将数组类型字段值拆分成多条,每条包含数组的一个元素。

    > db.user.find({"name":"rose"}).limit(1)
    { "_id" : ObjectId("5ae981f79ac174c20fb4c1f0"), "name" : "rose", "role" : [ 1, 2, 3, 4 ] }
    > db.user.aggregate([{$match:{"name":"rose"}},{$limit:1},{$unwind:"$role"}])
    { "_id" : ObjectId("5ae981f79ac174c20fb4c1f0"), "name" : "rose", "role" : 1 }
    { "_id" : ObjectId("5ae981f79ac174c20fb4c1f0"), "name" : "rose", "role" : 2 }
    { "_id" : ObjectId("5ae981f79ac174c20fb4c1f0"), "name" : "rose", "role" : 3 }
    { "_id" : ObjectId("5ae981f79ac174c20fb4c1f0"), "name" : "rose", "role" : 4 }
    
    1.1.6 $group

    分组操作,类似sql中的group by,一般与管道表达式$sum、$min等组合使用。

    语法如下

    { $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... } }
    

    示例

    > db.user.find().limit(3)
    { "_id" : ObjectId("5ae2d26a219cdd2458b3cac3"), "name" : "jack", "age" : 14 }
    { "_id" : ObjectId("5ae2d26c219cdd2458b3cac4"), "name" : "rose", "age" : 12 }
    { "_id" : ObjectId("5ae2d26d219cdd2458b3cac5"), "name" : "tom", "age" : 12, "tag" : "11岁以上", "r" : [ 5 ] }
    > db.user.aggregate([{$limit:3},{$group:{_id:"$age"}}])
    { "_id" : 12 }
    { "_id" : 14 }
    > db.user.aggregate([{$limit:3},{$group:{_id:"$age","n":{$max:"$name"}}}])
    { "_id" : 12, "n" : "tom" }
    { "_id" : 14, "n" : "jack" }
    
    1.1.7 $sort

    对文档进行排序,1为升序,-1为降序。

    > db.user.aggregate([{$limit:3},{$sort:{age:1}},{$project:{_id:0,name:1,age:1}}])
    { "name" : "rose", "age" : 12 }
    { "name" : "tom", "age" : 12 }
    { "name" : "jack", "age" : 14 }
    > db.user.aggregate([{$limit:3},{$sort:{age:1}},{$sort:{name:-1}},{$project:{_id:0,name:1,age:1}}])
    { "name" : "tom", "age" : 12 }
    { "name" : "rose", "age" : 12 }
    { "name" : "jack", "age" : 14 }
    
    1.1.8 $lookup

    用于多文档连接查询返回关联数据。

    > db.product.find()
    { "_id" : 1, "name" : "产品1", "price" : 10, "type" : "品类1" }
    { "_id" : 2, "name" : "产品2", "price" : 11, "type" : "品类1" }
    { "_id" : 3, "name" : "产品3", "price" : 13, "type" : "品类2" }
    { "_id" : 4, "name" : "产品4", "price" : 15, "type" : "品类2" }
    { "_id" : 5, "name" : "产品5", "price" : 17, "type" : "品类3" }
    { "_id" : 6, "name" : "产品6", "price" : 19, "type" : "品类3" }
    > db.order.find()
    { "_id" : 1, "product_id" : 1, "name" : "订单1" }
    { "_id" : 2, "product_id" : 2, "name" : "订单2" }
    { "_id" : 3, "product_id" : 2, "name" : "订单3" }
    { "_id" : 4, "product_id" : 1, "name" : "订单4" }
    { "_id" : 5, "name" : "订单5" }
    > db.order.aggregate({$lookup:{from:"product",localField:"product_id",foreignField:"_id",as:"order_product"}})
    { "_id" : 1, "product_id" : 1, "name" : "订单1", "order_product" : [ { "_id" : 1, "name" : "产品1", "price" : 10, "type" : "品类1" } ] }
    { "_id" : 2, "product_id" : 2, "name" : "订单2", "order_product" : [ { "_id" : 2, "name" : "产品2", "price" : 11, "type" : "品类1" } ] }
    { "_id" : 3, "product_id" : 2, "name" : "订单3", "order_product" : [ { "_id" : 2, "name" : "产品2", "price" : 11, "type" : "品类1" } ] }
    { "_id" : 4, "product_id" : 1, "name" : "订单4", "order_product" : [ { "_id" : 1, "name" : "产品1", "price" : 10, "type" : "品类1" } ] }
    { "_id" : 5, "name" : "订单5", "order_product" : [ ] }
    

    MongoDB 3.6加入了一些新的特性,可以到官网手册查阅
    https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/

    1.1.9 $geoNear

    输出接近指定地点的文档,LBS相关的应用中常用。
    https://docs.mongodb.com/manual/reference/operator/aggregation/geoNear/

    1.2 Aggregation Pipeline Operators

    聚合管道操作符可以构造用于聚合管道的表达式,聚合表达式的形式如下

    { <operator>: [ <argument1>, <argument2> ... ] }
    

    如果操作符对应单个参数,其表达式形式如下

    { <operator>: <argument> }
    

    MongoDB中支持的管道操作符太多,这里只给出几种常用的。

    1.2.1 $sum 求和
    > db.product.find()
    { "_id" : 1, "name" : "产品1", "price" : 10, "type" : "品类1" }
    { "_id" : 2, "name" : "产品2", "price" : 11, "type" : "品类1" }
    { "_id" : 3, "name" : "产品3", "price" : 13, "type" : "品类2" }
    { "_id" : 4, "name" : "产品4", "price" : 15, "type" : "品类2" }
    { "_id" : 5, "name" : "产品5", "price" : 17, "type" : "品类3" }
    { "_id" : 6, "name" : "产品6", "price" : 19, "type" : "品类3" }
    > db.product.aggregate([{$group:{_id:"$type",price:{$sum:"$price"}}},{$sort:{_id:1}}])
    { "_id" : "品类1", "price" : 21 }
    { "_id" : "品类2", "price" : 28 }
    { "_id" : "品类3", "price" : 36 }
    
    1.2.2 $avg 求平均值
    > db.product.aggregate([{$group:{"_id":"$type","price":{$avg:"$price"}}},{$sort:{_id:1}}])
    { "_id" : "品类1", "price" : 10.5 }
    { "_id" : "品类2", "price" : 14 }
    { "_id" : "品类3", "price" : 18 }
    
    1.2.3 $min 取最小值
    > db.product.aggregate([{$group:{"_id":"$type","price":{$min:"$price"}}},{$sort:{_id:1}}])
    { "_id" : "品类1", "price" : 10 }
    { "_id" : "品类2", "price" : 13 }
    { "_id" : "品类3", "price" : 17 }
    
    1.2.4 $max 取最大值
    > db.product.aggregate([{$group:{"_id":"$type","price":{$max:"$price"}}},{$sort:{_id:1}}])
    { "_id" : "品类1", "price" : 11 }
    { "_id" : "品类2", "price" : 15 }
    { "_id" : "品类3", "price" : 19 }
    
    1.2.5 $push 添加数组元素
    > db.product.aggregate([{$group:{"_id":"$type","products":{$push:"$name"}}},{$sort:{_id:1}}])
    { "_id" : "品类1", "products" : [ "产品1", "产品2" ] }
    { "_id" : "品类2", "products" : [ "产品3", "产品4" ] }
    { "_id" : "品类3", "products" : [ "产品5", "产品6" ] }
    
    1.2.6 $addToSet 添加数组元素(无重复)

    $addToSet与$push几乎一致,区别就是不会添加已有值。

    1.2.7 $first 取第一个元素
    > db.product.aggregate([{$group:{"_id":"$type","product":{$first:"$name"}}},{$sort:{_id:1}}])
    { "_id" : "品类1", "product" : "产品1" }
    { "_id" : "品类2", "product" : "产品3" }
    { "_id" : "品类3", "product" : "产品5" }
    
    1.2.8 $last 取最后一个元素
    > db.product.aggregate([{$group:{"_id":"$type","product":{$last:"$name"}}},{$sort:{_id:1}}])
    { "_id" : "品类1", "product" : "产品2" }
    { "_id" : "品类2", "product" : "产品4" }
    { "_id" : "品类3", "product" : "产品6" }
    

    更多管道操作相关内容查看官网文档 https://docs.mongodb.com/manual/reference/operator/aggregation/

    2.Map-Reduce Function

    Map-Reduce是一个数据处理模型,它将大量数据分解(Map),再合并成最终结果(Reduce)。

    以下图片是官方手册中Map-Reduce的处理过程的示例:

    map-reduce.png

    Map-Reduce的语法如下([ 参数 ]为可选项):

    db.runCommand(
                   {
                     mapReduce: <collection>,    # 要操作的集合
                     map: <function>,    # 映射函数(生成键值对序列,作为reduce函数参数)
                     reduce: <function>,    # 统计函数
                     [ finalize: <function>, ]    # 对reduce返回结果进行最终整理后存入结果集合
                     [ out: <output>, ]
                     [ query: <document>, ]  # 查询条件
                     [ sort: <document>, ]    # 排序
                     [ limit: <number>, ]    # 限制返回记录数量
                     [ scope: <document>, ]    # 向map、reduce、finalize函数传入外部变量
                     [ jsMode: <boolean>, ]
                     [ verbose: <boolean>, ]    # 显示详细的时间统计信息
                     [ bypassDocumentValidation: <boolean>, ]
                     [ collation: <document> ]
                   }
                 )
    

    参数说明

    • map函数:function(){ emit(key,value) },提交两个参数key和value,数据会根据key的值进行分组,同组的value值存入values中。key和value作为reduce函数的参数。
    • reduce函数:function(key,values){ },参数key是分组字段,values是同组的值,reduce函数中对分组数据进行处理。
    • out:指定结果集保存在哪里,可以是一个集合名称,也可以使用如下文档配置
    # 结果集存放在内存(结果集需要<16M)
    { inline:1 } 
    # 结果集存放在集合中,如果该集合中已经有数据,可以选择以下三种模式处理
    { replace:"collection" }    # 集合中的旧数据被替换,不保留 
    { merge:"collection"}    # 合并含有相同键的结果文档
    { reduce:"collection"}    # 调用reduce函数,根据新值来处理旧集合的值
    

    示例
    准备数据

    > db.order.find()
    { "_id" : ObjectId("5aec0f5733e1b532f26e4c74"), "cust_id" : "A123", "amount" : 500, "status" : "A" }
    { "_id" : ObjectId("5aec0f6333e1b532f26e4c75"), "cust_id" : "A123", "amount" : 250, "status" : "A" }
    { "_id" : ObjectId("5aec0f7333e1b532f26e4c76"), "cust_id" : "B212", "amount" : 200, "status" : "A" }
    { "_id" : ObjectId("5aec0f8333e1b532f26e4c77"), "cust_id" : "A123", "amount" : 300, "status" : "D" }
    

    使用db.collection.mapReduce()的形式,结果集保存到内存

    > db.order.mapReduce(function() {
    ...     emit(this.cust_id, this.amount);
    ... },
    ... function(key, values) {
    ...     return Array.sum(values)
    ... },
    ... {
    ...     query: {
    ...         status: "A"
    ...     },
    ...     out: {
    ...         inline: 1
    ...     },
    ...     finalize: function(key, reduced) {
    ...         return "总数为" + reduced;
    ...     }
    ... })
    {
        "results" : [
            {
                "_id" : "A123",
                "value" : "总数为750"
            },
            {
                "_id" : "B212",
                "value" : "总数为200"
            }
        ],
        "timeMillis" : 14,
        "counts" : {
            "input" : 3,
            "emit" : 3,
            "reduce" : 1,
            "output" : 2
        },
        "ok" : 1
    }
    

    使用db.runCommand()的形式,结果集保存到指定的集合

    > db.runCommand({
    ...     mapReduce: "order",
    ...     map: function() {
    ...         emit(this.cust_id, this.amount);
    ...     },
    ...     reduce: function(key, values) {
    ...         return Array.sum(values)
    ...     },
    ...     query: {
    ...         status: "A"
    ...     },
    ...     out: {
    ...         replace: "order_total"
    ...     },
    ...     finalize: function(key, reduced) {
    ...         return "总数为" + reduced
    ...     }
    ... })
    {
        "result" : "order_total",
        "timeMillis" : 55,
        "counts" : {
            "input" : 3,
            "emit" : 3,
            "reduce" : 1,
            "output" : 2
        },
        "ok" : 1
    }
    > db.order_total.find()
    { "_id" : "A123", "value" : "总数为750" }
    { "_id" : "B212", "value" : "总数为200" }
    

    示例中仅仅演示了求和,除此以外,reduce函数中可以根据需要进行一系列的复杂处理。

    3.Single Purpose Aggregation Methods

    MongoDB同样提供了一些单一用途的聚合函数

    • db.collection.count():返回匹配的文档数

    • db.collection.distinct():以数组形式返回去重的字段值

    > db.user.find()
    { "_id" : ObjectId("5ae2d26a219cdd2458b3cac3"), "name" : "jack", "age" : 14 }
    { "_id" : ObjectId("5ae2d26c219cdd2458b3cac4"), "name" : "rose", "age" : 12 }
    { "_id" : ObjectId("5ae2d26d219cdd2458b3cac5"), "name" : "tom", "age" : 12, "tag" : "11岁以上", "r" : [ 5 ] }
    > db.user.count({"age":12})
    2
    > db.user.distinct("age")
    [ 14, 12 ]
    

    相关文章

      网友评论

        本文标题:MongoDB - 聚合

        本文链接:https://www.haomeiwen.com/subject/xueyzxtx.html