美文网首页技术栈
python数据库类封装

python数据库类封装

作者: 浪漫矢志不渝 | 来源:发表于2023-12-04 14:25 被阅读0次
import pymysql
from log import MyLog as Log


class DbModel:
    # 数据库连接对象
    __db = None
    # 游标对象
    __cursor = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        return self

    def __new__(self, *args, **kwargs):
        if not hasattr(self, '_instance'):
            self._instance = super().__new__(self)
            # 主机
            host = 'host' in kwargs and kwargs['host'] or 'localhost'
            # 端口
            port = 'port' in kwargs and kwargs['port'] or '3306'
            # 用户名
            user = 'user' in kwargs and kwargs['user'] or 'root'
            # 密码
            passwd = 'passwd' in kwargs and kwargs['passwd'] or '123456'
            # 数据库
            db = 'db' in kwargs and kwargs['db'] or 'mypython'
            # 编码
            charset = 'charset' in kwargs and kwargs['charset'] or 'utf8mb4'
            # 打开数据库连接
            Log.info('连接数据库')
            self.__db = pymysql.connect(host = host, port = int(port), user = user, passwd = passwd, db = db, charset = charset)
            # 创建一个游标对象 cursor
            # self.__cursor = self.__db.cursor()
            self.__cursor = self.__db.cursor(cursor = pymysql.cursors.DictCursor)
        return self._instance

    # 返回执行execute()方法后影响的行数
    def execute(self, sql):
        Log.info(sql)
        self.__cursor.execute(sql)
        rowcount = self.__cursor.rowcount
        return rowcount

    def query(self, sql):
        Log.info(sql)
        self.__cursor.execute(sql)
        results = self.__cursor.fetchall()
        return results

    # 增->返回新增ID
    def insert(self, **kwargs):
        table = kwargs['table']
        cell = kwargs['cell']
        del kwargs['table']
        sql = 'insert into %s set ' % table
        for k, v in cell.items():
            sql += "`%s`='%s'," % (k, v)
        sql = sql.rstrip(',')
        Log.info(sql)
        try:
            # 执行SQL语句
            self.__cursor.execute(sql)
            # 提交到数据库执行
            self.__db.commit()
            # 获取自增id
            res = self.__cursor.lastrowid
            return res
        except Exception as e:
            Log.error(str(e))
            # 发生错误时回滚
            self.__db.rollback()

    # 删->返回影响的行数
    def delete(self, **kwargs):
        table = kwargs['table']
        where = kwargs['where']
        sql = 'DELETE FROM %s where %s' % (table, where)
        Log.info(sql)
        try:
            # 执行SQL语句
            self.__cursor.execute(sql)
            # 提交到数据库执行
            self.__db.commit()
            # 影响的行数
            rowcount = self.__cursor.rowcount
            return rowcount
        except Exception as e:
            Log.error(str(e))
            # 发生错误时回滚
            self.__db.rollback()

    # 改->返回影响的行数
    def update(self, **kwargs):
        table = kwargs['table']
        cell = kwargs['cell']
        # del kwargs['table']
        kwargs.pop('table')

        where = kwargs['where']
        kwargs.pop('where')

        sql = 'update %s set ' % table
        for k, v in cell.items():
            sql += "`%s`='%s'," % (k, v)
        sql = sql.rstrip(',')
        sql += ' where %s' % where
        Log.info(sql)
        try:
            # 执行SQL语句
            self.__cursor.execute(sql)
            # 提交到数据库执行
            self.__db.commit()
            # 影响的行数
            rowcount = self.__cursor.rowcount
            return rowcount
        except Exception as e:
            Log.error(str(e))
            # 发生错误时回滚
            self.__db.rollback()

    def insert_many(self, **kwargs):
        table = kwargs.get('table')
        data = kwargs.get('data')
        query = ""
        duplicate = kwargs.get('duplicate')
        values = []
        for data_dict in data:
            if not query:
                columns = ', '.join('`{0}`'.format(k) for k in data_dict)
                place_holders = ', '.join('%s'.format(k) for k in data_dict)
                query = "INSERT INTO {0} ({1}) VALUES ({2})".format(table, columns, place_holders)
                if duplicate:
                    duplicateStr = ', '.join('`{0}`=VALUES(`{1}`)'.format(k, k) for k in duplicate)
                    query = query + " ON DUPLICATE KEY UPDATE {0}".format(duplicateStr)
            v = list(data_dict.values())[:]
            values.append(v)
        Log.info(query)
        try:
            # 执行SQL语句
            self.__cursor.executemany(query, values)
            # 提交到数据库执行
            self.__db.commit()
            # 影响的行数
            rowcount = self.__cursor.rowcount
            return rowcount
        except Exception as e:
            Log.error(str(e))
            # 发生错误时回滚
            self.__db.rollback()

    # 查->单条数据
    def fetchone(self, **kwargs):
        table = kwargs['table']
        # 字段
        field = 'field' in kwargs and kwargs['field'] or '*'
        # where
        where = 'where' in kwargs and 'where ' + kwargs['where'] or ''
        # order
        order = 'order' in kwargs and 'order by ' + kwargs['order'] or ''

        sql = 'select %s from %s %s %s limit 1' % (field, table, where, order)
        Log.info(sql)
        try:
            # 执行SQL语句
            self.__cursor.execute(sql)
            # 使用 fetchone() 方法获取单条数据.
            data = self.__cursor.fetchone()
            return data
        except Exception as e:
            Log.error(str(e))
            # 发生错误时回滚
            self.__db.rollback()

    # 查->多条数据
    def fetchall(self, **kwargs):
        table = kwargs['table']
        # 字段
        field = 'field' in kwargs and kwargs['field'] or '*'
        # where
        where = 'where' in kwargs and 'where ' + kwargs['where'] or ''
        # order
        order = 'order' in kwargs and 'order by ' + kwargs['order'] or ''
        # limit
        limit = 'limit' in kwargs and 'limit ' + kwargs['limit'] or ''
        sql = 'select %s from %s %s %s %s' % (field, table, where, order, limit)
        Log.info(sql)
        try:
            # 执行SQL语句
            self.__cursor.execute(sql)
            # 使用 fetchone() 方法获取单条数据.
            data = self.__cursor.fetchall()
            return data
        except Exception as e:
            Log.error(str(e))
            # 发生错误时回滚
            self.__db.rollback()

    # 析构函数,释放对象时使用
    def __del__(self):
        # 关闭数据库连接
        self.__db.close()
        print('关闭数据库连接')

"""
封装log方法
"""

import os
import time
import platform
import logging.handlers

# 日志打印等级
LEVELS = {
    'debug': logging.DEBUG,
    'info': logging.INFO,
    'warning': logging.WARNING,
    'error': logging.ERROR,
    'critical': logging.CRITICAL
}

# 创建一个日志
logger = logging.getLogger()
level = 'default'


# 创建日志文件方法
def create_file(filename):
    path = filename[0:filename.rfind('/')]
    if not os.path.isdir(path):
        os.makedirs(path)
    if not os.path.isfile(filename):
        fd = open(filename, mode = 'w', encoding = 'utf-8')
        fd.close()
    else:
        pass


# 给logger添加handler 添加内容到日志句柄中
def set_handler(levels):
    if levels == 'error':
        logger.addHandler(MyLog.err_handler)
    logger.addHandler(MyLog.handler)


# 在记录日志之后移除句柄
def remove_handler(levels):
    if levels == 'error':
        logger.removeHandler(MyLog.err_handler)
    logger.removeHandler(MyLog.handler)


def get_current_time():
    return time.strftime(MyLog.date, time.localtime(time.time()))


class MyLog:
    path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    host_os = platform.system()
    if host_os == 'Linux':
        log_file = r'/var/log/info.log'
        err_file = r'/var/log/err.log'
    else:
        log_file = path + '/Log/info.log'
        err_file = path + '/Log/err.log'
    logger.setLevel(LEVELS.get(level, logging.NOTSET))
    create_file(log_file)
    create_file(err_file)
    date = '%Y-%m-%d %H:%M:%S'

    # 创建一个handler,用于写入日志文件
    handler = logging.FileHandler(log_file, encoding = 'utf-8')
    err_handler = logging.FileHandler(err_file, encoding = 'utf-8')

    @staticmethod
    def debug(log_meg):
        set_handler('debug')
        # 文件中输出模式
        logger.debug("[" + get_current_time() + "] - [DEBUG]:" + log_meg)
        remove_handler('debug')

    @staticmethod
    def info(log_meg):
        set_handler('info')
        logger.info("[" + get_current_time() + "] - [INFO]:" + log_meg)
        remove_handler('info')

    @staticmethod
    def warning(log_meg):
        set_handler('warning')
        logger.warning("[" + get_current_time() + "] - [WARNING]:" + log_meg)
        remove_handler('warning')

    @staticmethod
    def error(log_meg):
        set_handler('error')
        logger.error("[" + get_current_time() + "] - [ERROR]:" + log_meg)
        remove_handler('error')

    @staticmethod
    def critical(log_meg):
        set_handler('critical')
        logger.error("[" + get_current_time() + "] - [CRITICAL]:" + log_meg)
        remove_handler('critical')

    # # 设置控制台输出格式
    # formatter = logging.Formatter(
    #     '[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')
    # # 再创建一个handler,用于输出到控制台
    # console = logging.StreamHandler()
    # console.setFormatter(formatter)
    # logger.addHandler(console)
    # console.setLevel(logging.NOTSET)


if __name__ == "__main__":
    MyLog.debug("This is debug message")
    MyLog.info("This is info message")
    MyLog.warning("This is warning message")
    MyLog.error("This is error")
    MyLog.critical("This is critical message")

相关文章

网友评论

    本文标题:python数据库类封装

    本文链接:https://www.haomeiwen.com/subject/bxaqqrtx.html