将Tsv批量导入mongodb

作者: Tim在路上 | 来源:发表于2019-02-18 10:58 被阅读0次

    使用python脚本语言处理数据比较快,同时代码也比较简洁。

    连接mongodb

    1. 导入包
    import pymongo,urllib
    import sys
    from datetime import timedelta
    import os
    import uuid
    

    使用pymongo可以快速的处理与mongodb的事物

    1. 连接mongodb
    conn = pymongo.MongoClient("sv6.aesc.nrse.com",27018)
    
    如果数据库有密码需要先使用admin进行权限认证
    
    db = conn.admin
    
    db.authenticate("root","123456")
    
    mydb = conn.httpsystem
    
    collection = mydb.myCollection
    
    
    1. 使用insert_many
    count = 0
    index = 1
    dlist = []
    with open(output_path) as f:
        for line in f:
            Idict = {}
            fileds = line.split("\t")
            if len(fileds) != 30:
                pass
            else:
                try:
                    Idict["_id"] = fileds[0].strip()
                    Idict["srcIp"] = fileds[1].strip()
                    Idict["srcPort"] = int(fileds[2].strip())
                    Idict["distIp"] = fileds[3].strip()
                    Idict["distPort"] = int(fileds[4].strip())
                    Idict["requestURL"] = fileds[5].strip()
                    Idict["requestMethod"] = fileds[6].strip()
                    Idict["requestUserAgent"] = fileds[7].strip()
                    Idict["requestCookie"] = fileds[8].strip()
                    Idict["responseServer"] = fileds[9].strip()
                    Idict["responseCode"] = int(fileds[10].strip())
                    Idict["requestHeader"] = fileds[11].strip()
                    Idict["requestContType"] = fileds[12].strip()
                    Idict["responseCharset"] = fileds[13].strip()
                    Idict["httpVersion"] = fileds[14].strip()
                    Idict["requestHost"] = fileds[15].strip()
                    Idict["requestBodyString"] = fileds[16].strip()
                    Idict["requestParameterString"] = fileds[17].strip()
                    Idict["responseContentType"] = fileds[18].strip()
                    Idict["responseHeader"] = fileds[19].strip()
                    Idict["responseBodyReference"] = fileds[20].strip()
                    Idict["ML_rule_juge"] = fileds[21].strip()
                    Idict["ML_rule_juge_id"] = fileds[22].strip()
                    Idict["ML_type"] = fileds[23].strip()
                    Idict["ML_juge_mal"] = fileds[24].strip()
                    Idict["ML_juge_type"] = fileds[25].strip()
                    Idict["DLCNN_rule_juge"] = fileds[26].strip()
                    Idict["DLCNN_type"] = fileds[27].strip()
                    Idict["DLCNN_juge_mal"] = fileds[28].strip()
                    Idict["DLCNN_juge_type"] = fileds[29].strip()
                except:
                    count = count + 1
                    print(count)
                dlist.append(Idict)
                if index%300 == 0:
                    collection.insert_many(dlist)
                    dlist = []
                index = index + 1
                print(index)
    

    每300条数据进行批量插入一次

    1. 功能强大的bulk
    #有条理的大规模数据写入
    bulk = db.test.initialize_ordered_bulk_op()
    # Remove all documents from the previous example.
    bulk.find({}).remove()
    bulk.insert({‘_id‘: 1})
    bulk.insert({‘_id‘: 2})
    bulk.insert({‘_id‘: 3})
    #更新
    bulk.find({‘_id‘: 1}).update({‘$set‘: {‘foo‘: ‘bar‘}})
    #插入替换
    bulk.find({‘_id‘: 4}).upsert().update({‘$inc‘: {‘j‘: 1}})
    #替换
    bulk.find({‘j‘: 1}).replace_one({‘j‘: 2})
    #execute是执行
    result = bulk.execute()
    pprint(result)
    
    import pymongo
    
    
    bulk = pymongo.bulk.BulkOperationBuilder(collection, ordered=False)
    for task in tasks:
        bulk.insert({"Url": task, "Mark": 0})
    bulk.execute()
    
    bulk_write
    from pymongo import UpdateOne, ReplaceOne # InsertOne, DeleteOne
    >>> from pymongo import MongoClient as MC
    >>> db = MC().test.test1
     
    >>> db.count()
    1
     
    >>> list(db.find())
    [{u'_id': u'a', u'n': u'a'}]
     
    #  先用 UpdateOne 测试一下
    >>> db.bulk_write([UpdateOne({"_id":"a"},{"$set":{"n":"aa"}}, upsert=True), UpdateOne({"_id":"b"},{"$set":{"n":"b"}}, upsert=True)])
    0x102d4d370>
    >>> list(db.find())
    [{u'_id': u'a', u'n': u'aa'}, {u'_id': u'b', u'n': u'b'}]
    # 可以看到 记录 a 的 "n" 成功被更新为 "aa", 并且新增了一条记录 b
    # 再用 ReplaceOne 测试一下
    >>> db.bulk_write([ReplaceOne({"_id":"b"},{"n":"bb"}, upsert=True), ReplaceOne({"_id":"c"},{"n":"cc"}, upsert=True)])
    0x102d4d370>
    >>> list(db.find())
    [{u'_id': u'a', u'n': u'aa'}, {u'_id': u'b', u'n': u'bb'}, {u'_id': u'c', u'n': u'cc'}]
    

    查看数据条数

    cd  ~/mongodb/mongodb-linux-x86_64-ubuntu1604-4.0.4/bin
    
    mongo 127.0.0.1:27018/admin -u root -p 123456
    
    db.bulkCollection.count()
    

    相关文章

      网友评论

        本文标题:将Tsv批量导入mongodb

        本文链接:https://www.haomeiwen.com/subject/lozoeqtx.html