sqlalchemy是python最为常用的第三方ORM模块。
1、pycharm安装sqlalchemy
命令: pip install sqlalchemy
2、sqlalchemy连接数据库
db_sqlalchemy.py模块代码:
参数都放在yaml中,此步省略...
HOST = db.get("host")
PORT = db.get("port")
USERNAME = db.get("user")
PASSWORD = db.get("passwd")
DBBASE = db.get("database")
OVERFLOW = db.get("max_overflow")
POOLSIZE = db.get("pool_size")
TIMEOUT = db.get("pool_timeout")
RECYCEL = db.get("pool_recycle")
DBMS = db.get("dbms")
DBAPI = db.get("dbapi")
CHARSET = db.get("db_charset")
DB_URI = f'{DBMS}+{DBAPI}://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DBBASE}'
# 创建引擎
engine = create_engine(
DB_URI,
# 超过链接池大小外最多创建的链接
max_overflow=OVERFLOW,
# 链接池大小
pool_size=POOLSIZE,
# 链接池中没有可用链接则最多等待的秒数,超过该秒数后报错
pool_timeout=TIMEOUT,
# 多久之后对链接池中的链接进行一次回收
pool_recycle=RECYCEL,
# 查看原生语句(未格式化)
echo=True,
encoding=CHARSET
)
class MySqlalchemySession():
def __enter__(self):
self.session = self.getSession()
def getSession(self):
# 绑定引擎
Session = sessionmaker(bind=engine, expire_on_commit=False)
session = scoped_session(Session)
return session
# 释放连接池资源
def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()
session = MySqlalchemySession().getSession()
3、创建映射对象
customer.py模块:
import datetime
import traceback
from common_util import reda_sql_data
from db_sqlalchemy import Base, session
from sqlalchemy import (
Column,
Integer,
String,
DateTime,
)
ms = reda_sql_data("XXX", "customer_list_by_id")
class customer(Base):
""" 必须继承Base """
# 数据库中存储的表名
__tablename__ = "customer"
# 对于必须插入的字段,采用nullable=False进行约束,它相当于NOT NULL
# 对于非必须插入的字段,不用采取nullable=False进行约束
id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
sa_code = Column(String(100), nullable=False, comment="sa_code ")
customer_no = Column(String(64), nullable=False, comment="编号")
status = Column(String(64), nullable=False, comment="status ")
create_time = Column(DateTime, default=datetime.datetime.now, comment="创建时间")
last_modify_time = Column(DateTime, onupdate=datetime.datetime.now, comment="最后更新时间")
deleted = Column(Integer, comment="是否删除")
def __str__(self):
return f"<id:{self.id} customer_no :{self.customer_no }>"
4、使用原生SQL查询
def query_customer_list_by_id(sql,sa_code='AA', limit=1):
try:
conditions = dict()
base_sql_pre = sql
conditions.update({"sa_code": sa_code})
if status :
status _sql = " and cm.status = :status "
conditions.update({"status ": status })
else:
status_sql = ""
page_sql = " order by create_time desc limit :limit_size"
conditions.update({"limit_size": int(limit)})
# 组合sql
select_sql = base_sql_pre + status_sql + page_sql
print(select_sql)
cursor = session.execute(select_sql, conditions)
return cursor.fetchall()
# return res
except Exception:
traceback.print_exc()
5、测试
if __name__ == "__main__":
re = query_customer_list_by_id("select cm.customer_no from customer cm where cm.sa_code = :saas_tenant_code and ")
print(list(re[0])[0])
网友评论