美文网首页python3自写小工具
封装pymysql,方便快速调用(python3)

封装pymysql,方便快速调用(python3)

作者: AmanWang | 来源:发表于2020-09-14 13:55 被阅读0次

    介绍

    这是通过对pymysql进行的二次简单封装,封装后可以很方便地调用以对mysql数据库进行操作。
    引用方式:在要使用数据库的地方import该对象,然后通过AmanMySQL(mysql["host"], mysql["user"], mysql["passwd"], mysql["dbName"])连接数据库赋值给某一变量,之后就可以通过对应变量对select、update、insert、delete等对数据库做操作。源码和示例如下:

    源码

    #encoding: utf-8
    
    import pymysql
    
    class AmanMySQL():
        # 初始化
        def __init__(self, host, user, passwd, dbName):
            self.host = host
            self.user = user
            self.passwd = passwd
            self.dbName = dbName
        
        # 连接数据库,需要传数据库地址、用户名、密码、数据库名称,默认设置了编码信息
        def connet(self):
            try:
                self.db = pymysql.connect(self.host, self.user, self.passwd, self.dbName, use_unicode=True, charset='utf8')
                self.cursor = self.db.cursor()
            except Exception as e:
                return e
        
         # 关闭数据库连接 
        def close(self):
            try:
                self.cursor.close()
                self.db.close()
            except Exception as e:
                return e
    
         # 查询操作,查询单条数据
        def get_one(self, sql):
            # res = None
            try:
                self.connet()
                self.cursor.execute(sql)
                res = self.cursor.fetchone()
                self.close()
            except Exception:
                res = None
            return res
    
         # 查询操作,查询多条数据
        def get_all(self, sql):
            # res = ()
            try:
                self.connet()
                self.cursor.execute(sql)
                res = self.cursor.fetchall()
                self.close()
            except Exception:
                res = ()
            return res
       
         # 查询数据库对象
        def get_all_obj(self, sql, tableName, *args):
            resList = []
            fieldsList = []
            try:
                if (len(args) > 0):
                    for item in args:
                        fieldsList.append(item)
                else:
                    fieldsSql = "select COLUMN_NAME from information_schema.COLUMNS where table_name = '%s' and table_schema = '%s'" % (
                        tableName, self.dbName)
                    fields = self.get_all(fieldsSql)
                    for item in fields:
                        fieldsList.append(item[0])
    
                # 执行查询数据sql
                res = self.get_all(sql)
                for item in res:
                    obj = {}
                    count = 0
                    for x in item:
                        obj[fieldsList[count]] = x
                        count += 1
                    resList.append(obj)
                return resList
            except Exception as e:
                return e
    
         # 数据库插入、更新、删除操作
        def insert(self, sql):
            return self.__edit(sql)
        def update(self, sql):
            return self.__edit(sql)
        def delete(self, sql):
            return self.__edit(sql)
        def __edit(self, sql):
            # count = 0
            try:
                self.connet()
                count = self.cursor.execute(sql)
                self.db.commit()
                self.close()
            except Exception as e:
                self.db.rollback()
                count = 0
            return count
    

    调用示例

    # 调用
    if __name__ == '__main__':
        # 使用
        mysql = {
            "host": "127.0.0.1",
            "port": "3306",
            "user": "test",
            "passwd": "123456",
            "dbName": "testMysql",
        }
        useDB = AmanMySQL(mysql["host"], mysql["user"], mysql["passwd"], mysql["dbName"])
    
        # 查询
        print('************************************select查询*********************************')
        sql = "select case_name from case_manage"   # 不带条件sql
        sqlWithCondition = "select case_name from case_manage where create_at > '2020-08-20 11:18:47'"   # 带条件sql
        getOne = useDB.get_one(sql)
        getAll = useDB.get_all(sql)
        getAllWithCondition = useDB.get_all(sqlWithCondition)
        print('len(getOne)=%d' %(len(getOne), ))
        print('getOne=%s' %(getOne, ))
        print('len(getAll)=%d' % (len(getAll),))
        print('getAll=%s' %(getAll, ))
        print('len(getAllWithCondition)=%d' % (len(getAllWithCondition),))
        print('getAllWithCondition=%s' % (getAllWithCondition,))
        print()
    
        # 更新
        print('************************************update更新*********************************')
        select_sql_1 = "select case_name from case_manage where case_id=2"
        update_sql = "update case_manage set case_name = '测试测试-06' where case_id=2"
        select_res_1 = useDB.get_one(select_sql_1)
        update_res = useDB.update(update_sql)
        select_res_2 = useDB.get_one(select_sql_1)
        print('数据更新前:%s' %(select_res_1,))
        print('数据更新结果:%s' %(update_res,))
        print('数据更新后:%s' % (select_res_2,))
        print()
    
        # 删除
        print('************************************delete删除*********************************')
        select_sql_2 = "select count(1) from case_group"
        insert_sql = "delete from case_group where group_name = '测试插入数据'"
        select_res_3 = useDB.get_one(select_sql_2)
        delete_res = useDB.insert(insert_sql)
        select_res_4 = useDB.get_one(select_sql_2)
        print('数据删除前行数:%s' % (select_res_3,))
        print('数据删除结果:%s' % (delete_res,))
        print('数据删除后行数:%s' % (select_res_4,))
        print()
    
        # 插入
        print('************************************insert插入*********************************')
        select_sql_3 = "select count(1) from case_group"
        insert_sql = "insert  into case_group(group_name,group_code,ext,create_by,update_by,create_at,update_at,status,isDel) values ('测试插入数据','hello','0','管理员','乾隆大帝','2020-06-15 10:37:56','2020-06-16 15:43:03',1,0)"
        select_res_5 = useDB.get_one(select_sql_3)
        insert_res = useDB.insert(insert_sql)
        select_res_6 = useDB.get_one(select_sql_3)
        print('数据插入前行数:%s' % (select_res_5,))
        print('数据插入结果:%s' % (insert_res,))
        print('数据插入后行数:%s' % (select_res_6,))
        print()
    
    
    # 结果
    ************************************select查询*********************************
    len(getOne)=1
    getOne=('123',)
    len(getAll)=53
    getAll=(('123',), ('23',), ('aaaa',), ('dfgs规范的的',), ('sdfsdf奋斗奋斗',), ('修改购物车商品数量',), ('加购物车',), ('取消选中购物车商品',), ('商品搜索-价格由低到高排序',), ('商品搜索-价格由高到低排序',), ('商品搜索-第二页排序',), ('商品搜索-筛选排序',), ('商品搜索-综合排序',), ('商品搜索-销量排序',), ('商品详情-单参数',), ('商品详情页-多参数',), ('商品详情页获取推荐商品',), ('孕期-首页-胎宝宝变化',), ('我就是我',), ('我就是我44',), ('搜索-妙招01',), ('搜索页面html',), ('撒旦发射',), ('检查可售区域',), ('欧电首页',), ('测试00121',), ('测试测试-03',), ('测试补订单',), ('消息中心-关注我的',), ('消息中心-系统消息',), ('消息中心-系统消息23',), ('登录-测试环境',), ('获取优惠券',), ('获取分享信息',), ('获取分销员信息',), ('获取历史搜索记录',), ('获取商品media',), ('获取商品价格和库存',), ('获取商品售后信息',), ('获取商品基础信息',), ('获取商品属性',), ('获取商品销量',), ('获取商家信息',), ('获取商家商品描述',), ('获取搜索热词',), ('获取用户信息',), ('获取用户收货地址',), ('获取电商弹窗数据',), ('获取购物车商品list',), ('购物车商品计数',), ('选中购物车商品',), ('首页-index',), ('首页33消息中心',))
    len(getAllWithCondition)=8
    getAllWithCondition=(('商品搜索-第二页排序',), ('获取购物车商品list',), ('购物车商品计数',), ('修改购物车商品数量',), ('取消选中购物车商品',), ('选中购物车商品',), ('登录-测试环境',), ('消息中心-系统消息23',))
    
    ************************************update更新*********************************
    数据更新前:('测试测试-03',)
    数据更新结果:1
    数据更新后:('测试测试-06',)
    
    ************************************delete删除*********************************
    数据删除前行数:(16,)
    数据删除结果:1
    数据删除后行数:(15,)
    
    ************************************insert插入*********************************
    数据插入前行数:(15,)
    数据插入结果:1
    数据插入后行数:(16,)
    

    相关文章

      网友评论

        本文标题:封装pymysql,方便快速调用(python3)

        本文链接:https://www.haomeiwen.com/subject/vrnaektx.html