美文网首页
python数据库类

python数据库类

作者: 山猪打不过家猪 | 来源:发表于2021-08-31 10:15 被阅读0次
    #!/usr/bin/env python
    # coding:UTF-8
     
     
    """
    @version: python3.x
    @author:曹新健
    @contact: 617349013@qq.com
    @software: PyCharm
    @file: dbSql.py
    @time: 2018/9/22 17:47
    """
     
    import pymysql
    import logging
    import sys
     
    # 加入日志
    # 获取logger实例
    logger = logging.getLogger("dbSql")
    # 指定输出格式
    formatter = logging.Formatter('%(asctime)s\
                  %(levelname)-8s:%(message)s')
    # 文件日志
    file_handler = logging.FileHandler("dbSql.log")
    file_handler.setFormatter(formatter)
    # 控制台日志
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(formatter)
     
    # 为logge添加具体的日志处理器
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
     
    logger.setLevel(logging.INFO)
     
     
    class DbManager:
        # 构造函数
        def __init__(self, host='127.0.0.1',port=3306, user='cxj',
    passwd='123456', db='cxjtest',charset='utf8'):
            self.host = host
            self.port = port
            self.user = user
            self.passwd = passwd
            self.db = db
            self.charset = charset
            self.conn = None
            self.cur = None
     
        # 连接数据库
        def connectDatabase(self):
            try:
                self.conn = pymysql.connect(host=self.host,port=self.port,user=self.user,passwd=self.passwd,db=self.db,charset=self.charset)
            except:
                logger.error("connectDatabase failed")
                return False
            self.cur = self.conn.cursor()
            return True
     
        # 关闭数据库
        def close(self):
            # 如果数据打开,则关闭;否则没有操作
            if self.conn and self.cur:
                self.cur.close()
                self.conn.close()
            return True
     
        # 执行数据库的sq语句,主要用来做插入操作
        def execute(self, sql,params=None,commit=False,):
            # 连接数据库
            res = self.connectDatabase()
            if not res:
                return False
            try:
                if self.conn and self.cur:
                    # 正常逻辑,执行sql,提交操作
                    rowcount = self.cur.execute(sql, params)
                    #print(rowcount)
                    if commit:
                        self.conn.commit()
                    else:
                        pass
            except:
                logger.error("execute failed: " + sql)
                logger.error("params: " + str(params))
                self.close()
                return False
            return rowcount
     
        # 查询所有数据
        def fetchall(self, sql, params=None):
            res = self.execute(sql, params)
            if not res:
                logger.info("查询失败")
                return False
            self.close()
            results = self.cur.fetchall()
            logger.info("查询成功" + str(results))
            return results
     
         # 查询一条数据
        def fetchone(self, sql, params=None):
            res = self.execute(sql, params)
            if not res:
                logger.info("查询失败")
                return False
            self.close()
            result = self.cur.fetchone()
            logger.info("查询成功" + str(result))
            return result
     
        # 增删改数据
        def edit(self, sql,params=None):
            res = self.execute(sql,params,True)
            if not res:
                logger.info("操作失败")
                return False
            self.conn.commit()
            self.close()
            logger.info("操作成功" + str(res))
            return res
     
     
    if __name__ == '__main__':
        dbManager = DbManager()
        """
        sql = "select * from bandcard WHERE money>%s;"
        values = [1000]
        result = dbManager.fetchall(sql, values)
        """
        sql = "insert into bandcard values %s,%s,%s;"
        values = [(0, 100), (0, 200), (0, 300)]
        result = dbManager.edit(sql,values)
    
    
    import re
    
    import pymysql
    """
    connect连接对象的方法:
    
    close()  --关闭的方法
    
    commit()   --如果支持事务则提交挂起的事务
    
    rollback()  --回滚挂起的事务
    
    cursor()  --返回连接的游标对象
    游标对象的方法:
    
    callproc(name,[params]) --用来执行存储过程,接收的参数为存储过程的名字和参数列表,返回受影响的行数
    
    close()  --关闭游标
    
    execute(sql,[params])--执行sql语句,可以使用参数,(使用参数时,sql语句中用%s进行站位注值),返回受影响的行数
    
    executemany(sql,params)--执行单挑sql语句,但是重复执行参数列表里的参数,返回受影响的行数
    
    fetchone()  --返回结果的下一行
    
    fetchall()  --返回结果的 所有行
    
    fetchmany(size)--返回size条记录,如果size大于返回结果行的数量,则会返回cursor.arraysize条记录
    
    nextset()  --条至下一行
    
    setinputsizes(size)--定义cursor
    
    游标对象的属性:
    
    description--结果列的描述,只读
    
    rowcount  --结果中的行数,只读
    
    arraysize  --fetchmany返回的行数,默认为1
    
    """
    class MysqldbHelper(object):
        """操作mysql数据库,基本方法
    
            """
        def __init__(self , host="localhost", username="root", password="", port=3306, database="python_test"):
            self.host = host
            self.username = username
            self.password = password
            self.database = database
            self.port = port
            self.con = None
            self.cur = None
            try:
                self.con = pymysql.connect(host=self.host, user=self.username, passwd=self.password, port=self.port, db=self.database)
                # 所有的查询,都在连接 con 的一个模块 cursor 上面运行的
                self.cur = self.con.cursor()
            except:
                raise "DataBase connect error,please check the db config."
    
        def close(self):
            """关闭数据库连接
    
            """
            if not  self.con:
                self.con.close()
            else:
                raise "DataBase doesn't connect,close connectiong error;please check the db config."
    
        def getVersion(self):
            """获取数据库的版本号
    
            """
            self.cur.execute("SELECT VERSION()")
            return self.getOneData()
    
        def getOneData(self):
            # 取得上个查询的结果,是单个结果
            data = self.cur.fetchone()
            return data
    
        def creatTable(self, tablename, attrdict, constraint):
            """创建数据库表
    
                args:
                    tablename  :表名字
                    attrdict   :属性键值对,{'book_name':'varchar(200) NOT NULL'...}
                    constraint :主外键约束,PRIMARY KEY(`id`)
            """
            if self.isExistTable(tablename):
                return
            sql = ''
            sql_mid = '`id` bigint(11) NOT NULL AUTO_INCREMENT,'
            for attr,value in attrdict.items():
                sql_mid = sql_mid + '`'+attr + '`'+' '+ value+','
            sql = sql + 'CREATE TABLE IF NOT EXISTS %s ('%tablename
            sql = sql + sql_mid
            sql = sql + constraint
            sql = sql + ') ENGINE=InnoDB DEFAULT CHARSET=utf8'
            print('creatTable:'+sql)
            self.executeCommit(sql)
    
        def executeSql(self,sql=''):
            """执行sql语句,针对读操作返回结果集
    
                args:
                    sql  :sql语句
            """
            try:
                self.cur.execute(sql)
                records = self.cur.fetchall()
                return records
            except pymysql.Error as e:
                error = 'MySQL execute failed! ERROR (%s): %s' %(e.args[0],e.args[1])
                print(error)
    
        def executeCommit(self,sql=''):
            """执行数据库sql语句,针对更新,删除,事务等操作失败时回滚
    
            """
            try:
                self.cur.execute(sql)
                self.con.commit()
            except pymysql.Error as e:
                self.con.rollback()
                error = 'MySQL execute failed! ERROR (%s): %s' %(e.args[0],e.args[1])
                print("error:", error)
                return error
    
        def insert(self, tablename, params):
            """插入数据库
    
                args:
                    tablename  :表名字
                    key        :属性键
                    value      :属性值
            """
            key = []
            value = []
            for tmpkey, tmpvalue in params.items():
                key.append(tmpkey)
                if isinstance(tmpvalue, str):
                    value.append("\'" + tmpvalue + "\'")
                else:
                    value.append(tmpvalue)
            attrs_sql = '('+','.join(key)+')'
            values_sql = ' values('+','.join(value)+')'
            sql = 'insert into %s'%tablename
            sql = sql + attrs_sql + values_sql
            print('_insert:'+sql)
            self.executeCommit(sql)
    
        def select(self, tablename, cond_dict='', order='', fields='*'):
            """查询数据
    
                args:
                    tablename  :表名字
                    cond_dict  :查询条件
                    order      :排序条件
    
                example:
                    print mydb.select(table)
                    print mydb.select(table, fields=["name"])
                    print mydb.select(table, fields=["name", "age"])
                    print mydb.select(table, fields=["age", "name"])
            """
            consql = ' '
            if cond_dict!='':
                for k, v in cond_dict.items():
                    consql = consql+k + '=' + v + ' and'
            consql = consql + ' 1=1 '
            if fields == "*":
                sql = 'select * from %s where ' % tablename
            else:
                if isinstance(fields, list):
                    fields = ",".join(fields)
                    sql = 'select %s from %s where ' % (fields, tablename)
                else:
                    raise "fields input error, please input list fields."
            sql = sql + consql + order
            print('select:' + sql)
            return self.executeSql(sql)
    
        def insertMany(self,table, attrs, values):
            """插入多条数据
    
                args:
                    tablename  :表名字
                    attrs        :属性键
                    values      :属性值
    
                example:
                    table='test_mysqldb'
                    key = ["id" ,"name", "age"]
                    value = [[101, "liuqiao", "25"], [102,"liuqiao1", "26"], [103 ,"liuqiao2", "27"], [104 ,"liuqiao3", "28"]]
                    mydb.insertMany(table, key, value)
            """
            values_sql = ['%s' for v in attrs]
            attrs_sql = '('+','.join(attrs)+')'
            values_sql = ' values('+','.join(values_sql)+')'
            sql = 'insert into %s'% table
            sql = sql + attrs_sql + values_sql
            print('insertMany:'+sql)
            try:
                print(sql)
                for i in range(0,len(values),20000):
                        self.cur.executemany(sql,values[i:i+20000])
                        self.con.commit()
            except pymysql.Error as e:
                self.con.rollback()
                error = 'insertMany executemany failed! ERROR (%s): %s' %(e.args[0],e.args[1])
                print(error)
    
        def delete(self, tablename, cond_dict):
            """删除数据
    
                args:
                    tablename  :表名字
                    cond_dict  :删除条件字典
    
                example:
                    params = {"name" : "caixinglong", "age" : "38"}
                    mydb.delete(table, params)
    
            """
            consql = ' '
            if cond_dict!='':
                for k, v in cond_dict.items():
                    if isinstance(v, str):
                        v = "\'" + v + "\'"
                    consql = consql + tablename + "." + k + '=' + v + ' and '
            consql = consql + ' 1=1 '
            sql = "DELETE FROM %s where%s" % (tablename, consql)
            print (sql)
            return self.executeCommit(sql)
    
        def update(self, tablename, attrs_dict, cond_dict):
            """更新数据
    
                args:
                    tablename  :表名字
                    attrs_dict  :更新属性键值对字典
                    cond_dict  :更新条件字典
    
                example:
                    params = {"name" : "caixinglong", "age" : "38"}
                    cond_dict = {"name" : "liuqiao", "age" : "18"}
                    mydb.update(table, params, cond_dict)
    
            """
            attrs_list = []
            consql = ' '
            for tmpkey, tmpvalue in attrs_dict.items():
                attrs_list.append("`" + tmpkey + "`" + "=" +"\'" + tmpvalue + "\'")
            attrs_sql = ",".join(attrs_list)
            print("attrs_sql:", attrs_sql)
            if cond_dict!='':
                for k, v in cond_dict.items():
                    if isinstance(v, str):
                        v = "\'" + v + "\'"
                    consql = consql + "`" + tablename +"`." + "`" + k + "`" + '=' + v + ' and '
            consql = consql + ' 1=1 '
            sql = "UPDATE %s SET %s where%s" % (tablename, attrs_sql, consql)
            print(sql)
            return self.executeCommit(sql)
    
        def dropTable(self, tablename):
            """删除数据库表
    
                args:
                    tablename  :表名字
            """
            sql = "DROP TABLE  %s" % tablename
            self.executeCommit(sql)
    
        def deleteTable(self, tablename):
            """清空数据库表
    
                args:
                    tablename  :表名字
            """
            sql = "DELETE FROM %s" % tablename
            self.executeCommit(sql)
    
        def isExistTable(self, tablename):
            """判断数据表是否存在
    
                args:
                    tablename  :表名字
    
                Return:
                    存在返回True,不存在返回False
            """
            sql = "select * from %s" % tablename
            result = self.executeCommit(sql)
            if result is None:
                return True
            else:
                if re.search("doesn't exist", result):
                    return False
                else:
                    return True
    
    if __name__ == "__main__":
        mydb = MysqldbHelper()
        print(mydb.getVersion())
        table='test_mysqldb'
        attrs={'name':'varchar(200) DEFAULT NULL','age':'int(11) DEFAULT NULL'}
        constraint='PRIMARY KEY(`id`)'
        print(mydb.creatTable(table, attrs, constraint))
        params = {"name" : "caixinglong", "age" : "38"}
        mydb.insert('test_mysqldb', params)
        print(mydb.select(table))
        print(mydb.select(table, fields=["name", "age"]))
        print( mydb.select(table, fields=["age", "name"]))
        key = ["id" ,"name", "age"]
        value = [[101, "liuqiao", "25"], [102,"liuqiao1", "26"], [103 ,"liuqiao2", "27"], [108 ,"liuqiao3", "28"]]
        mydb.insertMany(table, key, value)
        mydb.delete(table, params)
        cond_dict = {"name" : "liuqiao", "age" : "18"}
        mydb.update(table, params, cond_dict)
        # mydb.deleteTable(table)
        # mydb.dropTable(table)
        print(mydb.select(table+ "1"))
        print( mydb.isExistTable(table+ "1"))
    
    
    

    相关文章

      网友评论

          本文标题:python数据库类

          本文链接:https://www.haomeiwen.com/subject/vbtziltx.html