美文网首页
Python 连接Mysql数据库

Python 连接Mysql数据库

作者: 乘风破浪的姐姐 | 来源:发表于2022-08-18 14:50 被阅读0次

1.pyCharm 安装连接mysql数据库的依赖
安装数据库驱动:pip install pymysql
安装连接池库:pip install dbutils

  1. 简单使用,无连接池配置
import pymysql.connections

class MysqlConnection():

    def __init__(self,db=None):
        self.db = pymysql.connect(
            host="1.0.0.1",
            user="admin",
            passwd="123",
            database="db"
        )

    def select(self,sql):
        try:
            mycursor = self.db.cursor()
            mycursor.execute(sql)

            myresult = mycursor.fetchall()  # fetchall() 获取所有记录

            rs_list = []
            for x in myresult:
                rs_list.append(x)
                # print(x)
            print(rs_list)
        except Exception:
            print("查询失败。")
        finally:
            mycursor.close()
            self.db.close()


if __name__ == '__main__':
    MysqlConnection().select("select code from activity a where   deleted =0 and name='AAA'")

3.使用数据库连接池,并进行封装,高级使用
db_collection.py 连接初始化模块
1)设置类变量 pool 池,
2)设置方法 getconn ,作用通过 dbutils 模块的 PooledDB 方法连接数据库,给 pool 赋值
3)设置方法 getMyConn,返回自身实例对象
核心作用:创建一个类,此类可以创建一个连接池 pool

#-*-coding:GBK -*-
 
from dbutils.pooled_db import PooledDB
import pymysql
# import db_config as config

"""
@功能:创建数据库连接池
"""

# 数据库信息
DB_TEST_HOST = "1.0.0.1"
DB_TEST_USER = "admin"
DB_TEST_PASSWORD = "123"
DB_TEST_DBNAME = "db"

# 数据库连接编码
DB_CHARSET = "utf8"

# mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
DB_MIN_CACHED = 10

# maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
DB_MAX_CACHED = 10

# maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
DB_MAX_SHARED = 20

# maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
DB_MAX_CONNECYIONS = 100

# blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......> 其他代表阻塞直到连接数减少,连接被分配)
DB_BLOCKING = True

# maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
DB_MAX_USAGE = 0

# setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
DB_SET_SESSION = None

# creator : 使用连接数据库的模块
DB_CREATOR = pymysql

class MyConnectionPool(object):
    __pool = None


    # 创建数据库连接conn和游标cursor
    def __enter__(self):
        self.conn = self.__getconn()
        self.cursor = self.conn.cursor()

    # 创建数据库连接池
    def __getconn(self):
        if self.__pool is None:
            self.__pool = PooledDB(
                creator=DB_CREATOR,
                mincached=DB_MIN_CACHED,
                maxcached=DB_MAX_CACHED,
                maxshared=DB_MAX_SHARED,
                maxconnections=DB_MAX_CONNECYIONS,
                blocking=DB_BLOCKING,
                maxusage=DB_MAX_USAGE,
                setsession=DB_SET_SESSION,
                host=DB_TEST_HOST,
                user=DB_TEST_USER,
                passwd=DB_TEST_PASSWORD,
                db=DB_TEST_DBNAME,
                use_unicode=True,
                charset=DB_CHARSET
            )
        return self.__pool.connection()

    # 释放连接池资源
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()


    # 从连接池中取出一个连接
    def getconn(self):
        conn = self.__getconn()
        cursor = conn.cursor()
        return cursor, conn


# 获取连接池,实例化
def get_my_connection():
    return MyConnectionPool()

db_util.py
1)该模块,主要用于获取连接
2)封装执行命令,判断是否有参数和是否执行完就释放连接
3)执行语句,增/删/改/查,查询有结果返回结果没有返回0;增/删/改返回变更数据条数,没有返回0

#-*-coding:GBK -*-
from db_collection import get_my_connection
 

class JDBCUtills(object):
    def __init__(self):
        self.db = get_my_connection()  # 从数据池中获取连接

    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, 'inst'):  # 单例
            cls.inst = super(JDBCUtills, cls).__new__(cls, *args, **kwargs)
        return cls.inst

     # 封装执行命令
    def execute(self, sql, param=None, autoclose=False):
        """
        :param sql: 字符串类型,sql语句
        :param param: sql语句中要替换的参数"select %s from tab where id=%s" 其中的%s就是参数
        :param autoclose: 是否关闭连接
        :return: 返回连接conn和游标cursor
        """
        cursor, conn = self.db.getconn()  # 从连接池获取连接
        count = 0
        try:
            # count : 为改变的数据条数
            if param:
                count = cursor.execute(sql, param)
            else:
                count = cursor.execute(sql)
            conn.commit()
            if autoclose:
                self.close(cursor, conn)
        except Exception as e:
            pass
        return cursor, conn, count

   # 释放连接
    def close(self, cursor, conn):
        """释放连接归还给连接池"""
        cursor.close()
        conn.close()


   # 查询所有
    def selectall(self, sql, param=None):
        try:
            cursor, conn, count = self.execute(sql, param)
            res = cursor.fetchall()
            return res
        except Exception as e:
            print(e)
            self.close(cursor, conn)
            return count

    # 查询单条
    def selectone(self, sql, param=None):
        try:
            cursor, conn, count = self.execute(sql, param)
            res = cursor.fetchone()
            self.close(cursor, conn)
            return res
        except Exception as e:
            print("error_msg:", e.args)
            self.close(cursor, conn)
            return count

    # 增加
    def insertone(self, sql, param):
        try:
            cursor, conn, count = self.execute(sql, param)
            # _id = cursor.lastrowid()  # 获取当前插入数据的主键id,该id应该为自动生成为好
            conn.commit()
            self.close(cursor, conn)
            return count
            # 防止表中没有id返回0
            # if _id == 0:
            #     return True
            # return _id
        except Exception as e:
            print(e)
            conn.rollback()
            self.close(cursor, conn)
            return count

    # 增加多行
    def insertmany(self, sql, param):
        """
        :param sql:
        :param param: 必须是元组或列表[(),()]或((),())
        :return:
        """
        cursor, conn, count = self.db.getconn()
        try:
            cursor.executemany(sql, param)
            conn.commit()
            return count
        except Exception as e:
            print(e)
            conn.rollback()
            self.close(cursor, conn)
            return count

    # 删除
    def delete(self, sql, param=None):
        try:
            cursor, conn, count = self.execute(sql, param)
            self.close(cursor, conn)
            return count
        except Exception as e:
            print(e)
            conn.rollback()
            self.close(cursor, conn)
            return count

    # 更新
    def update(self, sql, param=None):
        try:
            cursor, conn, count = self.execute(sql, param)
            conn.commit()
            self.close(cursor, conn)
            return count
        except Exception as e:
            print(e)
            conn.rollback()
            self.close(cursor, conn)
            return count
 
if __name__ == '__main__':
    result =JDBCUtills().selectone("select code from activity a where   deleted =0 and name='AAA'")
    print(result )

踩坑记录:
坑1,执行测试报错:SyntaxError: Non-UTF-8 code starting with '\xd0' in file
解决方案:在db_util.py 中头部加上

 #-*-coding:GBK -*-

坑2,再执行仍错:SyntaxError: (unicode error) 'utf-8' codec can't decode byte 0xc1 in position 1: invalid start byte
解决方案:在db_collection.py 中头部加上

 #-*-coding:GBK -*-

相关文章

网友评论

      本文标题:Python 连接Mysql数据库

      本文链接:https://www.haomeiwen.com/subject/yqyjgrtx.html