mongodb
## 安装
pip install pymongo
## 使用
from pymongo import MongoClient
conn = MongoClient('192.168.0.113', 27017)
db = conn.mydb #连接mydb数据库,没有则自动创建
my_set = db.test_set #使用test_set集合,没有则自动创建
### 增
db.COLLECTION_NAME.insert(document)
### 更新
my_set.update(
<query>, #查询条件
<update>, #update的对象和一些更新的操作符
{
upsert: <boolean>, #如果不存在update的记录,是否插入
multi: <boolean>, #可选,mongodb 默认是false,只更新找到的第一条记录
writeConcern: <document> #可选,抛出异常的级别。
}
)
//删除
my_set.remove(
<query>, #(可选)删除的文档的条件
{
justOne: <boolean>, #(可选)如果设为 true 或 1,则只删除一个文档
writeConcern: <document> #(可选)抛出异常的级别
}
)
//查
my_set.find({"name":"zhangsan"}).sort([("age",1)])
my_set.find({"age":{"$gt":25}}) //$lte
my_set.find({"name":"zhangsan"}).skip(2).limit(6)
my_set.find({"age":{"$in":(20,30,35)}})
my_set.find({"$or":[{"age":20},{"age":35}]})
my_set.find({'li':{'$all':[1,2,3,4]}})
//数组
my_set.update({'name':"lisi"}, {'$push':{'li':4}})
for i in my_set.find({'name':"lisi"}):
print(i)
#输出:{'li': [1, 2, 3, 4], '_id': ObjectId('58c50d784fc9d44ad8f2e803'), 'age': 18, 'name': 'lisi'}
my_set.update({'name':"lisi"}, {'$pushAll':{'li':[4,5]}})
for i in my_set.find({'name':"lisi"}):
print(i)
#输出:{'li': [1, 2, 3, 4, 4, 5], 'name': 'lisi', 'age': 18, '_id': ObjectId('58c50d784fc9d44ad8f2e803')}
#pop
#移除最后一个元素(-1为移除第一个)
my_set.update({'name':"lisi"}, {'$pop':{'li':1}})
for i in my_set.find({'name':"lisi"}):
print(i)
#输出:{'_id': ObjectId('58c50d784fc9d44ad8f2e803'), 'age': 18, 'name': 'lisi', 'li': [1, 2, 3, 4, 4]}
#pull (按值移除)
#移除3
my_set.update({'name':"lisi"}, {'$pop':{'li':3}})
#pullAll (移除全部符合条件的)
my_set.update({'name':"lisi"}, {'$pullAll':{'li':[1,2,3]}})
for i in my_set.find({'name':"lisi"}):
print(i)
#输出:{'name': 'lisi', '_id': ObjectId('58c50d784fc9d44ad8f2e803'), 'li': [4, 4], 'age': 18}
my_set.find_one({"contact.1.iphone":"222"})
//多级对象
my_set.find({"contact.iphone":"11223344"})
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from pymongo import MongoClient
settings = {
"ip":'192.168.0.113', #ip
"port":27017, #端口
"db_name" : "mydb", #数据库名字
"set_name" : "test_set" #集合名字
}
class MyMongoDB(object):
def __init__(self):
try:
self.conn = MongoClient(settings["ip"], settings["port"])
except Exception as e:
print(e)
self.db = self.conn[settings["db_name"]]
self.my_set = self.db[settings["set_name"]]
def insert(self,dic):
print("inser...")
self.my_set.insert(dic)
def update(self,dic,newdic):
print("update...")
self.my_set.update(dic,newdic)
def delete(self,dic):
print("delete...")
self.my_set.remove(dic)
def dbfind(self,dic):
print("find...")
data = self.my_set.find(dic)
for result in data:
print(result["name"],result["age"])
def main():
dic={"name":"zhangsan","age":18}
mongo = MyMongoDB()
mongo.insert(dic)
mongo.dbfind({"name":"zhangsan"})
mongo.update({"name":"zhangsan"},{"$set":{"age":"25"}})
mongo.dbfind({"name":"zhangsan"})
mongo.delete({"name":"zhangsan"})
mongo.dbfind({"name":"zhangsan"})
if __name__ == "__main__":
main()
mysql
pip3 install PyMySQL
import pymysql
db = pymysql.connect("localhost","testuser","test123","TESTDB" )
cursor = db.cursor()
### 创建表
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")
sql = """CREATE TABLE EMPLOYEE (
FIRST_NAME CHAR(20) NOT NULL,
LAST_NAME CHAR(20),
AGE INT,
SEX CHAR(1),
INCOME FLOAT )"""
cursor.execute(sql)
db.close()
### 插入数据
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
LAST_NAME, AGE, SEX, INCOME)
VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 关闭数据库连接
db.close()
### 查询,cursor:
fetchone(): 该方法获取下一个查询结果集。结果集是一个对象
fetchall(): 接收全部的返回结果行.
rowcount: 这是一个只读属性,并返回执行execute()方法后影响的行数。
redis
pip3 install redis
import redis
pool= redis.ConnectionPool(host='localhost',port=6379,decode_responses=True)
r=redis.Redis(connection_pool=pool)
r.set('apple','a')
print(r.get('apple'))
--
redis中set() ==>r.set()
redis中setnx() ==>r.set()
redis中setex() ==>r.setex()
redis中setbit() ==>r.setbit()
redis中mset() == > r.mset()
redis中hset() ==>r.hset()
redis中sadd() == >r.sadd()
r.hset('info','name','lilei')
r.hset('info','age','18')
print(r.hgetall('info'))
r.sadd('course','math','english','chinese')
-- 管道代替事务
pipe=r.pipeline()
print(r.get('a'))
try:
# pipe.watch('a')
pipe.multi()
pipe.set('here', 'there')
pipe.set('here1', 'there1')
pipe.set('here2', 'there2')
time.sleep(5)
pipe.execute()
except redis.exceptions.WatchError as e:
print("Error")
网友评论