使用用户名、密码
安装PyMongo:pip install pymongo
import pymongo #导入pymongo
class client_Mongo(object):
def __init__(self, host, port, db, p, u, w):
self.host = host
self.port = port
self.username = u
self.password = w
self.client = pymongo.MongoClient(host=self.host,
port=self.port,
connect=False, # 防止多线程造成卡顿
)
self.client.gdata.authenticate(self.username, self.password, mechanism='SCRAM-SHA-1')
self.db = self.client[db] # 链接myTest数据库
self.p = self.db[p] # persons集合
def insert(self, person):
# 插入数据
try:
self.p.insert(person) # 可以是列表,保存多个
print('存储成功', person['_id'])
except Exception as e:
if 'E11000' in str(e):
print('数据重复', person['_id'])
else:
print(e)
def find(self, p):
# 查询数据
try:
result = self.p.find_one(p)
return result
except Exception as e:
print(e)
def updata(self, p, up):
# 更新
if '$set' not in up: # 如需重置集合表,注释此项
raise Exception('未添加[set]函数') # 如需重置集合表,注释此项
try:
result = self.p.update(p, up)
return result
except Exception as e:
print(e)
def find_stored(self, lang):
# 排序查询
try:
result = self.p.find().sort([('排序字段', -1)]).limit(1).skip(0)
return result
except Exception as e:
print(e)
def tj_find_all(self, params):
# 参数查询
try:
result = self.p.find(params, no_cursor_timeout=True) # no_cursor_timeout 防止长时间链接卡顿
return result
except Exception as e:
print(e)
def del_data(self, person):
# 删除
try:
result = self.p.delete_many(person)
print(result)
except Exception as e:
print(e)
if __name__ == '__main__':
mongo = client_Mongo(host='localhost', # IP
port=27017, # 端口
db='xxx', # 数据库
p='xxx', # 表
u='xxx', # 用户名
w='xxx', # 密码
)
如需转载请注明出处
网友评论