美文网首页我爱编程
「Mongo」聚合操作与清洗重复数据项

「Mongo」聚合操作与清洗重复数据项

作者: HughDong | 来源:发表于2018-05-13 22:31 被阅读0次

    使用Mongo聚合操作来进行重复的数据项清洗,并使用PyMongo加入到数据清洗组件中。

    当前环境:PyMongo 3.6.1 / MongoDB 3.4.7 / Python 3.6.4 :: Anaconda, Inc.

    在爬虫中断续爬时会出现少量数据重复的问题,我将数据去重放在了数据清洗环节,清洗的过程中顺带将重复的数据删除。
    Mongo老版本的解决方案是建立单一索引,Mongo3.+可以使用聚合操作将重复的数据检索出来并进行删除。
    元数据结构如下:

    item = { 
        "_id" : ObjectId("..."), 
        "title" : "...",     # 数据标题
        "date" : "...",      # 数据日期
        "url" : "...",       # 数据来源
        "content" : "...", 
        "source" : "..."
        "category" : "...", 
        ...
    }
    

    需要根据「相同标题+相同日期+相同来源」判定数据重复,在管道中根据这三项条件分组($group)后计数将数量>1的匹配($match)出来,最后遍历删除(db.collections.remove())

    聚合操作的过程
    $group: 使用title/date/url作为条件进行分组组成新的_id,并计数+1,dups中存放元数据的_id
    $match: 在$group得到的分组基础上匹配数量>1的项



    Mongo Shell 查询重复数据的操作如下:

    db.test.aggregate([
        {
            $group: { _id: {'title': '$title','date':'$date','url': '$url'},count: {$sum: 1},dups: {$addToSet: '$_id'}}
        },
        {
            $match: {count: {$gt: 1}}
        }
    ])
    

    Mongo Shell 将查询到的结果删除操作:

    db.test.aggregate([
        ...                                     // 同上聚合操作,此处略
    ]).forEach(function(doc){
        doc.dups.shift();                       // 去除重复组的第一个元数据_id,得到除第一个之外的其他元组
        db.test.remove({_id: {$in: doc.dups}}); // remove()删除这些重复的数据
    })
    

    PyMongo 操作代码如下:
    使用bulk_write()进行批量删除

    pipeline = [
        {
            '$group': {
                '_id': {'title': '$title', 'date': '$date', 'url': '$url'},
                'count': {'$sum': 1},
                'dups': {
                    '$addToSet': '$_id'
                }
            },
        },
        {
            '$match': {
                'count': {
                    '$gt': 1
                }
            }
        }
    ]
    
    map_id = map(lambda doc: doc['dups'][1:], db['data_value'].aggregate(pipeline=pipeline))
    list_id = [item for sublist in map_id for item in sublist]
    print(db['data_value'] \
          .bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), list_id))) \
          .bulk_api_result)
    

    一行代码鬼畜版:

    print(db['data_value'].bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), [item for sublist in map(lambda doc: doc['dups'][1:], db['data_value'].aggregate(pipeline=[{'$group': {'_id': {'title': '$title', 'date': '$date', 'url': '$url'},'count': {'$sum': 1},'dups': {'$addToSet': '$_id'}},},{'$match': {'count': {'$gt': 1}}}])) for item in sublist]))).bulk_api_result)
    

    相关文章

      网友评论

        本文标题:「Mongo」聚合操作与清洗重复数据项

        本文链接:https://www.haomeiwen.com/subject/wasrdftx.html