python 连接mysql数据库
image.pngimport pymysql #引入操作数据库的库
#打开数据库连接
db=pymysql.connect(host='数据库ip',port=3306,user='用户名',passwd='密码',database='数据库',charset='utf8')
#创建游标
cursor=db.cursor()
#执行sql语句
cursor.execute('select version()');
result=cursor.fetchone()
print(result)
#查询操作
sql='select * from t_cms_article';
cursor.execute(sql)
#结果是返回第一个结果
cursor.fetchone()
#返回一个元组
result=cursor.fetchone()
print(result)
#获取所有结果
cursor.fetchall()
# 返回元组套元组
all_result=cursor.fetchall()
print(all_result)
#增加操作
sql='insert into t_sys_login_log(ip,logname,message)values("127.0.0.1","sq","登录成功")';
cursor.execute(sql)
#除了查询操作,其他操作都需要commit一下,才可以同步到数据库
db.commit()
#修改操作
sql='update t_sys_login_log set ip="localhost" where logname="sq"';
cursor.execute(sql)
db.commit()
#删除操作
sql='delete from t_sys_login_log where logname="sq"';
cursor.execute(sql)
#回滚,恢复到上一次的状态
db.rollback()
db.commit()
#关闭释放游标和连接
cursor.close()
db.close()
封装的方式
#可以封装成类,和对应的方法
import pymysql
class DBConnection:
def __init__(self,host='192.168.111.128',port=3306,user='root',passwd='sq',database='sq-waimai'):
self.db=pymysql.connect(host=host,port=port,user=user,passwd=passwd,database=database)
self.cursor=self.db.cursor()
#查询方法
def select(self,sql,many=True):
self.cursor.execute(sql)
if many:
result = self.cursor.fetchall()
else:
result = self.cursor.fetchone()
return result
def do(self,sql):
try:
self.cursor.execute(sql)
except Exception as error:
self.db.rollback()
print(f'mysqldb error{error}')
raise error
self.db.commit()
#增加操作
def insert(self,sql):
self.do(sql)
#修改操作
def update(self,sql) :
self.do(sql)
#删除操作
def delete(self,sql):
self.do(sql)
#关闭游标和连接
def exit(self):
self.cursor.close()
self.db.close()
if __name__ == ' __main__':
db = DBConnection()
sql = ' select * from t_cms_article';
res = db. select(sql)
print(res)
db.exit()
python 连接mango数据库
image.pngimport pymongo
class MongoDB:
def __init__(self,user='admin',pwd='sq',ip='192.168.226.129',port=27017):
'''
:param user:用户名
:param pwd: 密码
:param ip: ip地址
:param port: 端口号
'''
self.user=user
self.pwd=pwd
self.ip=ip
self.port=port
self.client = pymongo.MongoClient(f"mongodb://{user}:{pwd}@{ip}:{port}/")
def insert(self,collection,query,db='sq-waimai'):
'''
:param db: 使用的数据库
:param collection: 使用的集合
:param query: 添加的信息
:return:
'''
db=self.client[db]
collection=db[collection]
collection.insert_one(query)
def find_one(self,collection,query,db='sq-waimai'):
'''
:param db: 使用的数据库
:param collection: 使用的集合
:param query: 查询的信息
:return: 查询结果
'''
db=self.client[db]
collection=db[collection]
result=collection.find_one(query)
return result
def find_all(self,collection,query,db='sq-waimai'):
'''
:param db: 使用的数据库
:param collection: 使用的集合
:param query: 查询的信息
:return: 查询返回所有结果
'''
db=self.client[db]
collection=db[collection]
results=collection.find(query)
return results
def update_one(self,collection,myquery,newvalues,db='sq-waimai'):
'''
:param db: 使用的数据库
:param collection: 使用的集合
:param myquery: 查询的信息
:param newvalues: 修改的信息
:return:
'''
db=self.client[db]
collection=db[collection]
collection.update_one(myquery,{'$set':newvalues})
#collection.update_many(myquery, {'$set': newvalues}) 修改所有的
def delete_one(self,collection,query,db='sq-waimai'):
'''
:param db: 使用数据库
:param collection: 使用的集合
:param query: 信息
:return:
'''
db=self.client[db]
collection=db[collection]
collection.delete_one(query)
def delete_all(self,collection,query,db='sq-waimai'):
'''
:param db: 使用数据库
:param collection: 使用的集合
:param query: 信息
:return:
'''
db=self.client[db]
collection=db[collection]
collection.delete_many(query)
if __name__ == '__main__':
#1-创建mongodb数据库连接
db=MongoDB()
# #2- 插入数据
# db.insert('foods',{'restaurant_id':3269,"category_id":3333,"description":'xx非常好吃',"item_id":6872,'name':'烤肉饭'})
# #3- 查一条数据
# res=db.find_one('foods',{'restaurant_id':3269})
# #4- 打印这一条数据
# print(res)
# #5- 删除所有数据
# # db.delete_all('foods',{'restaurant_id':3269})
# #6-查所有数据
# res1=db.find_all('foods',{'restaurant_id':3269})
# #7-打印所有数据
# print(res1)
# for one in res1:
# print(one)
#8- 更新数据
db.update_one('shops',{'name':'淘汰郎小火锅桐乡店'},{'name':'xintain火锅店'})
res1 = db.find_one('shops', {'id':1})
print(res1)
python 连接DBC数据库
import pymysql
class DBConnection:
def __init__(self, ip='', port=3306,user='root', passwd='sq', db=''):
self.ip = ip
self.port=port
self.user = user
self.passwd = passwd
self.db = db
def getCon(self):
try:
conn = pymysql.connect(host=self.ip, port=self.port, user=self.user, passwd=self.passwd,
database=self.db)
return conn
except pymysql.Error as e:
print(f'mysqldb error :{e}')
def select(self,sql):
try:
con=self.getCon()
cur=con.cursor()
cur.execute(sql)
result=cur.fetchall()
return result
except pymysql.Error as e:
print(f'mysqldb error :{e}')
finally:
cur.close()
con.close()
def update(self,sql):
try:
con = self.getCon()
cur = con.cursor()
cur.execute(sql)
con.commit()
except pymysql.Error as e:
con.rollback()
print(f'mysqldb error :{e}')
finally:
cur.close()
con.close()
def insert(self,sql):
try:
con = self.getCon()
cur = con.cursor()
cur.execute(sql)
con.commit()
except pymysql.Error as e:
con.rollback()
print(f'mysqldb error :{e}')
finally:
cur.close()
con.close()
def delete(self,sql):
try:
con = self.getCon()
cur = con.cursor()
cur.execute(sql)
con.commit()
except pymysql.Error as e:
con.rollback()
print(f'mysqldb error :{e}')
finally:
cur.close()
con.close()
if __name__ == '__main__':
# db=DBConnection(ip='192.168.19.144',db="sq-waimai")
# print(db.select("select * from `t_sys_dept` WHERE `id` = 3278"))
# db.update("UPDATE `sq-waimai`.`t_sys_dept` SET `fullname` = '巴奴火锅桐110' WHERE `id` = 3278")
# print(db.select("select * from `t_sys_dept` WHERE `id` = 3278"))
db=DBConnection(ip='192.168.226.129',db="sq-waimai")#连接mysql数据库---外卖数据库
res1=db.select('select * from t_sys_dept where id=1;')#查询店铺全称
print(res1)
db.update("UPDATE t_sys_dept SET fullname = 'sq淘汰郎小火锅桐乡店总部' WHERE id = 1;")#修改店铺全称
res2=db.select('select * from t_sys_dept where id=1;')
print(res2)
网友评论