美文网首页
12.接口自动化之工具封装006:python 连接数据库(my

12.接口自动化之工具封装006:python 连接数据库(my

作者: 小小一颗卤蛋 | 来源:发表于2024-05-08 18:54 被阅读0次

python 连接mysql数据库

image.png
import pymysql #引入操作数据库的库
#打开数据库连接
db=pymysql.connect(host='数据库ip',port=3306,user='用户名',passwd='密码',database='数据库',charset='utf8')
#创建游标
cursor=db.cursor()

#执行sql语句
cursor.execute('select version()');
result=cursor.fetchone()
print(result)

#查询操作
sql='select * from t_cms_article';
cursor.execute(sql)

#结果是返回第一个结果
cursor.fetchone()
#返回一个元组
result=cursor.fetchone()
print(result)
#获取所有结果
cursor.fetchall()
# 返回元组套元组
all_result=cursor.fetchall()
print(all_result)
#增加操作
sql='insert into t_sys_login_log(ip,logname,message)values("127.0.0.1","sq","登录成功")';
cursor.execute(sql)
#除了查询操作,其他操作都需要commit一下,才可以同步到数据库
db.commit()
#修改操作
sql='update t_sys_login_log set ip="localhost" where logname="sq"';
cursor.execute(sql)
db.commit()
#删除操作
sql='delete from t_sys_login_log where logname="sq"';
cursor.execute(sql)
#回滚,恢复到上一次的状态
db.rollback()
db.commit()
#关闭释放游标和连接
cursor.close()
db.close()

封装的方式

#可以封装成类,和对应的方法
import pymysql

class DBConnection:
    def __init__(self,host='192.168.111.128',port=3306,user='root',passwd='sq',database='sq-waimai'):
        self.db=pymysql.connect(host=host,port=port,user=user,passwd=passwd,database=database)
        self.cursor=self.db.cursor()
    #查询方法
    def select(self,sql,many=True):
        self.cursor.execute(sql)    
        if many:
            result = self.cursor.fetchall()
        else:
            result = self.cursor.fetchone()
        return result
    
    def do(self,sql):
        try:
            self.cursor.execute(sql)
        except Exception as error:
            self.db.rollback()
            print(f'mysqldb error{error}')
        raise error
        self.db.commit()
    #增加操作
    def insert(self,sql):
        self.do(sql)
    #修改操作
    def update(self,sql)    :
        self.do(sql)
    #删除操作
    def delete(self,sql):
        self.do(sql)
    #关闭游标和连接
    def exit(self):
        self.cursor.close()
        self.db.close()
if __name__ ==  '   __main__':
    db  =   DBConnection()
    sql =   '   select  *   from    t_cms_article';
    res =   db. select(sql)
    print(res)
    db.exit()

python 连接mango数据库

image.png
import pymongo
class MongoDB:
    def __init__(self,user='admin',pwd='sq',ip='192.168.226.129',port=27017):
        '''

        :param user:用户名
        :param pwd: 密码
        :param ip: ip地址
        :param port: 端口号
        '''
        self.user=user
        self.pwd=pwd
        self.ip=ip
        self.port=port
        self.client = pymongo.MongoClient(f"mongodb://{user}:{pwd}@{ip}:{port}/")

    def insert(self,collection,query,db='sq-waimai'):
        '''

        :param db: 使用的数据库
        :param collection: 使用的集合
        :param query: 添加的信息
        :return:
        '''
        db=self.client[db]
        collection=db[collection]
        collection.insert_one(query)

    def find_one(self,collection,query,db='sq-waimai'):
        '''

        :param db: 使用的数据库
        :param collection: 使用的集合
        :param query: 查询的信息
        :return: 查询结果
        '''
        db=self.client[db]
        collection=db[collection]
        result=collection.find_one(query)
        return result

    def find_all(self,collection,query,db='sq-waimai'):
        '''

        :param db: 使用的数据库
        :param collection: 使用的集合
        :param query: 查询的信息
        :return: 查询返回所有结果
        '''
        db=self.client[db]
        collection=db[collection]
        results=collection.find(query)
        return results

    def update_one(self,collection,myquery,newvalues,db='sq-waimai'):
        '''

        :param db: 使用的数据库
        :param collection: 使用的集合
        :param myquery: 查询的信息
        :param newvalues: 修改的信息
        :return:
        '''
        db=self.client[db]
        collection=db[collection]
        collection.update_one(myquery,{'$set':newvalues})
        #collection.update_many(myquery, {'$set': newvalues}) 修改所有的

    def delete_one(self,collection,query,db='sq-waimai'):
        '''

        :param db: 使用数据库
        :param collection: 使用的集合
        :param query: 信息
        :return:
        '''
        db=self.client[db]
        collection=db[collection]
        collection.delete_one(query)

    def delete_all(self,collection,query,db='sq-waimai'):
        '''

        :param db: 使用数据库
        :param collection: 使用的集合
        :param query: 信息
        :return:
        '''
        db=self.client[db]
        collection=db[collection]
        collection.delete_many(query)

if __name__ == '__main__':
    #1-创建mongodb数据库连接
    db=MongoDB()
    # #2- 插入数据
    # db.insert('foods',{'restaurant_id':3269,"category_id":3333,"description":'xx非常好吃',"item_id":6872,'name':'烤肉饭'})
    # #3- 查一条数据
    # res=db.find_one('foods',{'restaurant_id':3269})
    # #4- 打印这一条数据
    # print(res)
    # #5- 删除所有数据
    # # db.delete_all('foods',{'restaurant_id':3269})
    # #6-查所有数据
    # res1=db.find_all('foods',{'restaurant_id':3269})
    # #7-打印所有数据
    # print(res1)
    # for one in res1:
    #     print(one)

    #8- 更新数据
    db.update_one('shops',{'name':'淘汰郎小火锅桐乡店'},{'name':'xintain火锅店'})
    res1 = db.find_one('shops', {'id':1})
    print(res1)

python 连接DBC数据库

import pymysql
class DBConnection:
    def __init__(self, ip='', port=3306,user='root', passwd='sq', db=''):
        self.ip = ip
        self.port=port
        self.user = user
        self.passwd = passwd
        self.db = db
    def getCon(self):
        try:
            conn = pymysql.connect(host=self.ip, port=self.port, user=self.user, passwd=self.passwd,
                                        database=self.db)
            return conn
        except pymysql.Error as e:
            print(f'mysqldb error :{e}')

    def select(self,sql):
        try:
            con=self.getCon()
            cur=con.cursor()
            cur.execute(sql)
            result=cur.fetchall()
            return result
        except pymysql.Error as e:
            print(f'mysqldb error :{e}')
        finally:
            cur.close()
            con.close()
    def update(self,sql):
        try:
            con = self.getCon()
            cur = con.cursor()
            cur.execute(sql)
            con.commit()
        except pymysql.Error as e:
            con.rollback()
            print(f'mysqldb error :{e}')
        finally:
            cur.close()
            con.close()
    def insert(self,sql):
        try:
            con = self.getCon()
            cur = con.cursor()
            cur.execute(sql)
            con.commit()
        except pymysql.Error as e:
            con.rollback()
            print(f'mysqldb error :{e}')
        finally:
            cur.close()
            con.close()
    def delete(self,sql):
        try:
            con = self.getCon()
            cur = con.cursor()
            cur.execute(sql)
            con.commit()
        except pymysql.Error as e:
            con.rollback()
            print(f'mysqldb error :{e}')
        finally:
            cur.close()
            con.close()







if __name__ == '__main__':
    # db=DBConnection(ip='192.168.19.144',db="sq-waimai")
    # print(db.select("select * from `t_sys_dept` WHERE `id` = 3278"))
    # db.update("UPDATE `sq-waimai`.`t_sys_dept` SET `fullname` = '巴奴火锅桐110' WHERE `id` = 3278")
    # print(db.select("select * from `t_sys_dept` WHERE `id` = 3278"))
    db=DBConnection(ip='192.168.226.129',db="sq-waimai")#连接mysql数据库---外卖数据库
    res1=db.select('select * from t_sys_dept where id=1;')#查询店铺全称
    print(res1)
    db.update("UPDATE t_sys_dept SET fullname = 'sq淘汰郎小火锅桐乡店总部' WHERE id = 1;")#修改店铺全称
    res2=db.select('select * from t_sys_dept where id=1;')
    print(res2)

相关文章

网友评论

      本文标题:12.接口自动化之工具封装006:python 连接数据库(my

      本文链接:https://www.haomeiwen.com/subject/pjaacdtx.html