美文网首页
mysql常用操作封装

mysql常用操作封装

作者: 托贝多尔 | 来源:发表于2022-05-12 21:27 被阅读0次
    import time
    
    import pymysql
    from dbutils.pooled_db import PooledDB
    
    MIDPLAT_DB_CONFIG = {
        'host': 'localhost' ,
        'port': 3306 ,
        'user': 'xxx',
        'passwd': 'xxx',
        'db': 'xxx',
        'charset': 'utf8',
        'maxconnections': 10
    }
    def split_list(data_list: list, sub_list_length: int) -> list:
        """
        将一个list,拆分成多个子列表 ,子列表的最大长度为 sub_list_length
        :param data_list:
        :param sub_list_length:
        :return:
        """
        data_length = len(data_list)
        if data_length <= sub_list_length:
            return [data_list]
    
        result = []
        for i in range(0, data_length, sub_list_length):
            result.append(data_list[i:i + sub_list_length])
    
        return result
    
    class MysqlPoolHandler:
        def __init__(self,db_config=MIDPLAT_DB_CONFIG):
            self.mysql_pool_handler = PooledDB(creator=pymysql, **db_config, cursorclass=pymysql.cursors.DictCursor)
    
        def insert_mysql(self, sql: str) -> int:
            """
            MySQL 插入数据
            :param mysql_pool:数据库连接池
            :param sql:插入语句SQL
            :return:影响的行数
            """
            try:
                mysql_conn = self.mysql_pool_handler.connection()
                with mysql_conn.cursor() as cursor:
                    cursor.execute(sql)
                    result = cursor.rowcount
                mysql_conn.commit()
                mysql_conn.close()
                return result
            except Exception as e:
                print(e)
                return 0
    
    
        def update_mysql(self, table: str, data: dict, where: dict,
                               db: str = None):
            """
            MySQL 更新记录
            :param mysql_pool:数据库连接池
            :param table: 要插入的数据库表
            :param fields: 要插入的字段列表,需有序
            :param data: 要插入的数据,dict 列表
            :param update_fields: 当唯一索引冲突时要更新的字段
            :param db: 要插入的数据库名
            :return:影响的行数
            """
            assert table, "`table` cannot be empty"
            assert where, "`where` cannot be empty"
            if not data:
                print("update empty data!")
                return 0
            key_value_str = ",".join([f"`{x}`='{self._clean_value(data.get(x))}'" for x in data if x not in where])
            where_str = " AND ".join([f"`{y}`='{self._clean_value(where.get(y))}'" for y in where])
            table_name = f"{db}.{table}" if db else table
            update_sql = f"""UPDATE {table_name} SET {key_value_str} WHERE {where_str}"""
            print(update_sql)
            tmp = self.insert_mysql(update_sql)
            return tmp
    
        def query_mysql(self, sql: str, args=None) -> list:
            """
            MySQL 查询多条数据
            :param mysql_pool:数据库连接池
            :param sql:查询语句SQL
            :param args:参数,在sql中使用 %s占位符
            :return:查询结果
            """
    
            mysql_conn = self.mysql_pool_handler.connection()
            with mysql_conn.cursor() as cursor:
                cursor.execute(sql, args)
                result = cursor.fetchall() or []
            mysql_conn.close()
            return result
    
        def _clean_value(self, value):
            if isinstance(value, str):
                value = value.replace("'", "\\'")
            return value
    
        def batch_insert_mysql(self, table: str, fields: list, data: list, update_fields: list = None,
                               db: str = None) -> int:
            """
            MySQL 批量插入数据
            :param mysql_pool:数据库连接池
            :param table: 要插入的数据库表
            :param fields: 要插入的字段列表,需有序
            :param data: 要插入的数据,dict 列表
            :param update_fields: 当唯一索引冲突时要更新的字段
            :param db: 要插入的数据库名
            :return:影响的行数
            """
            assert table, "`table` cannot be empty"
            assert fields, "`fields` cannot be empty"
            if not data:
                print("insert empty data!")
                return 0
            field_str = ",".join([f"`{x}`" for x in fields])
            table_name = f"{db}.{table}" if db else table
            result = 0
            split_data = split_list(data, 1000)
            for sub_data in split_data:
                value_list = []
                for d in sub_data:
                    value_list.append(",".join([f"'{self._clean_value(d.get(x))}'" for x in fields]))
                value_str = ",".join([f"({x})" for x in value_list])
                insert_sql = f"""INSERT INTO {table_name}({field_str}) VALUES {value_str}"""
                if update_fields:
                    update_str = f" ON DUPLICATE KEY UPDATE {','.join([f'`{x}`=VALUES(`{x}`)' for x in update_fields])}"
                    insert_sql += update_str
                print(insert_sql)
                tmp = self.insert_mysql(insert_sql)
                result += tmp
            return result
        def __enter__(self):
            return self
    
        def __exit__(self):
            self.mysql_pool_handler.close()
    
    
    def insert_mysql(mysql_pool, sql: str) -> int:
        """
        MySQL 插入数据
        :param mysql_pool:数据库连接池
        :param sql:插入语句SQL
        :return:影响的行数
        """
        mysql_conn = mysql_pool.connection()
        with mysql_conn.cursor() as cursor:
            cursor.execute(sql)
            result = cursor.rowcount
        mysql_conn.commit()
        mysql_conn.close()
        return result
    
    
    def query_mysql(mysql_pool, sql: str, args=None) -> list:
        """
        MySQL 查询多条数据
        :param mysql_pool:数据库连接池
        :param sql:查询语句SQL
        :param args:参数,在sql中使用 %s占位符
        :return:查询结果
        """
        mysql_conn = mysql_pool.connection()
        with mysql_conn.cursor() as cursor:
            cursor.execute(sql, args)
            result = cursor.fetchall() or []
        mysql_conn.close()
        return result
    
    def raw_execute_sql(mysql_pool, sql: str):
        """
        原始地执行SQL
        :param mysql_pool:数据库连接池
        :param sql:任意SQL
        :return:
        """
        mysql_conn = mysql_pool.connection()
        with mysql_conn.cursor() as cursor:
            cursor.execute(sql)
            result = cursor.rowcount
        mysql_conn.commit()
        mysql_conn.close()
        return result
    
    def _clean_value(value):
        if isinstance(value, str):
            value = value.replace("'", "\\'")
        return value
    
    
    def batch_insert_mysql(mysql_pool, table: str, fields: list, data: list, update_fields: list = None,
                           db: str = None) -> int:
        """
        MySQL 批量插入数据
        :param mysql_pool:数据库连接池
        :param table: 要插入的数据库表
        :param fields: 要插入的字段列表,需有序
        :param data: 要插入的数据,dict 列表
        :param update_fields: 当唯一索引冲突时要更新的字段
        :param db: 要插入的数据库名
        :return:影响的行数
        """
        assert table, "`table` cannot be empty"
        assert fields, "`fields` cannot be empty"
        if not data:
            print("insert empty data!")
            return 0
    
        field_str = ",".join([f"`{x}`" for x in fields])
    
        table_name = f"{db}.{table}" if db else table
    
        result = 0
        split_data = split_list(data, 1000)
        for sub_data in split_data:
            value_list = []
            for d in sub_data:
                value_list.append(",".join([f"'{_clean_value(d.get(x))}'" for x in fields]))
            value_str = ",".join([f"({x})" for x in value_list])
    
            insert_sql = f"""INSERT INTO {table_name}({field_str}) VALUES {value_str}"""
            if update_fields:
                update_str = f" ON DUPLICATE KEY UPDATE {','.join([f'`{x}`=VALUES(`{x}`)' for x in update_fields])}"
                insert_sql += update_str
            print(insert_sql)
            tmp = insert_mysql(mysql_pool, insert_sql)
            result += tmp
        return result
    
    
    

    相关文章

      网友评论

          本文标题:mysql常用操作封装

          本文链接:https://www.haomeiwen.com/subject/bnpeurtx.html