SQLAlchemy学习笔记(版权所有,翻版必究)
一、实例化数据库链接session
- 参数
-
- DATABASE_URI(数据库地址):mysql://root:Yunji@2016@127.0.0.1:3306/flask?charset=utf8
- convert_unicode(True):设置str、boolean类型数据的默认行为
- echo:engine的log行为,默认False,可修改为True或者Debug,标准输出到控制台
- pool_size:数据库连接池大小,默认None,为5个连接数
- pool_recycle:数据库连接的自动回收时间
-
- autocommit:自动提交
- autoflush:刷新session
- bind:绑定engine
- 方式
- 方式1:不绑定app
engine = create_engine(DATABASE_URI, convert_unicode=True, pool_size=50, pool_recycle=3600)
session = scoped_session(sessionmaker(autocommit=True, autoflush=False, bind=engine))
db.session = session()
- 方式2:绑定flask app
# store.py
from flask.ext.sqlalchemy import SQLAlchemy
db = SQLAlchemy()
----------------------------------------------------------------
# app.py
def create_app(config_name=None):
app = Flask(__name__)
db.app = app
db.init_app(app)
return app
app = create_app()
----------------------------------------------------------------
# handler.py
from store import db
result = db.session.query(ModelA).all()
二、Model定义
import db
所有model类继承db.Model
-
- int
db.Column(db.Integer, primary_key=True)
- string
db.Column(db.String(50), nullable=False, unique=True)
- time
-
db.Column(db.TIMESTAMP, default=datetime.now(), nullable=False)
# 不能插入timestamp=0的数据 db.Column(db.DateTime(), nullable=False, server_default = func.now())
-
- boolean
db.Column(db.Boolean, nullable=True, default=True)
- float
db.Column(db.Float, nullable=False)
- int
-
- 主键 primary = True/False
- 是否为空 nullable = True/False
- 是否唯一 unique = True/False
- 默认值 default = 0
- 外键 foreignkey
- 级联删除 ondelete='CASCADE'
# parentModel # 定义父级关联的子集model from sqlalchemy.orm import relationship children = db.relationship("hostAssets", cascade="all, delete-orphan", passive_deletes=True) # children Model # 定义外键字段和外键删除规则 from sqlalchemy import ForeignKey business_id = db.Column(db.Integer, db.ForeignKey('business.id', ondelete='CASCADE'), nullable=True)```
三、Result序列化
- marshmallow
- model增加to_dict方法
# ModelA.py
model中绑定__table__的信息
__tablename__ = 'model_a'
def to_dict(self):
return {c.name: getattr(self, c.name, None) for c in self.__table__.columns}
result = session.query().first()
result.to_dict()
四、Function
- get 查询
- __dict__
_sa_instance:为内置的属性
- query() 查询
-
all()
ret = session.query(User).all()
得到的结果是model实例对象的列表,如果查询不到,返回空list,可以用ret.column获取指定字段的value -
first()
ret = session.query(User).first()
得到的结果是model实例对象,可以用ret.column获取指定字段的value,等同于one_or_none(),如果查询不到,返回None -
scalar()
ret = session.query(User.name).scalar()
得到的是name字段的value -
get(id)
ret = session.query(User).get(5)
得到的结果是model实例对象,可以用ret.column获取指定字段的value,get的内容必须是主键
-
- update
参数:
synchronize_session=False 不同步更新当前session
synchronize_session= fetch 更新之前进行查询,获取最新的更新对象
db.session.query(hostAssets).filter(hostAssets.business_id == ids).update({"business_id": None},
synchronize_session=False)
db.session.commit()
- merge(obiect)
net_mission = get_object_or_404(NetMission, data['id']) # get_object_or_404 可换为普通的query语句
net_mission.mission_name = data['mission_name']
db.session.merge(net_mission)
db.session.commit()
- delete
参数:
synchronize_session=False 不同步更新当前session
synchronize_session= fetch 更新之前进行查询,获取最新的更新对象
db.session.query(hostAssetsTrend).filter(hostAssetsTrend.tid == ids).delete(synchronize_session=False)
PingResultHours.query.filter(PingResultHours.create_time < time_1).delete(synchronize_session='fetch')
db.session.commit()
- delete(object)
net_mission = get_object_or_404(NetMission, data['id']) # get_object_or_404 可换为普通的query语句
db.session.delete(net_mission)
db.session.commit()
- add 添加
- 方式一
_ = PingResult(source_ip=ret['source_ip'], order_ip=ret['order_ip'],
delay=round(float(ret['delay']), 2), loss=round(float(ret['loss']), 2),
create_time=datetime.now())
db.session.add(_)
db.session.commit()
- 方式二
db.session.execute(PingResult.__table__.insert(),[{"business_name":"A"},{"business_name":"B"}])
db.session.commit()
-
commit
db.session.commit()
-
filter
from sqlalchemy import and\_, or\_, desc
- and_
hosts = model.query.filter(and_(model.source_ip == data['source_ip'],
model.order_ip == data['order_ip'],
model.create_time >= start_time)).order_by('create_time')
- or_
key = '%%%s%%'%key
cond = model.query.filter(or_(hardwareFault.host_sn.like(key), hardwareFault.host_name.like(key))).all()
- between
User.query.filter(User.id.between(1, 10)).all()
- like
key = '%%%s%%'%key
cond = model.query.filter(or_(hardwareFault.host_sn.like(key), hardwareFault.host_name.like(key))).all()
- order_by, desc, asc
latest_data = hosts.order_by(desc('create_time')).limit(1).first()
latest_data = hosts.order_by(hosts.create_time.desc()).limit(1).first()
- in_
hardwareFault.query.filter(hardwareFault.host_sn.in_([1,2,3,4,5,6])).all()
-
rollback
db.session.rollback()
-
close
db.session.close()
-
fllush
db.session.flush()
五、高级查询
- label
- 类似于sql中的as,可以对查询的字段或者运算的结果重命名
- group_by
- 可以group多个字段,即为要同时符合多个条件,结果可以用(result.字段名)获取
result = db.session.query(PingResultHours.source_ip, PingResultHours.order_ip, func.avg(
PingResultHours.delay).label('delay'), func.avg(PingResultHours.loss).label('loss')).filter(
PingResultHours.create_time > time_1, PingResultHours.create_time <= current_time).group_by(
PingResultHours.source_ip, PingResultHours.order_ip).all()
- union,union_all
相当于sql中的union,对相同字段的结果做拼接 - join
records = db.session.query(faultRepairRecord).
join(hostAssets, faultRepairRecord.host_sn == .order_by(faultRepairRecord.create_time.desc()).
order_by('id').offset(start).limit(result['page_size']).all()
- limit
限制返回的结果数量 - skip
要跳过的项数,SKIP 不能脱离 ORDER BY 子句单独使用 - distinct
对字段去重
records = db.session.query(
distinct(performanceTrend.create_time),performanceTrend.create_time,performanceTrend.cpu_ratio,
performanceTrend.cpu_max,performanceTrend.memory_ratio,performanceTrend.hdd_ratio).
filter(and_(performanceTrend.create_time >= new_starttime, performanceTrend.create_time <= new_endtime)).all()
六、Sql执行方式
- session.query
- session.execute
条件变量可以通过dict或者%s做替换
records = db.session.execute(
"select avg(type) as datatype,DATE_FORMAT(create_time,'%Y-%m-%d')as createTime,avg(cpu_ratio) as cpu_ratio,
max(cpu_max) as cpu_max,avg(memory_ratio) as memory_ratio, avg(hdd_ratio) as hdd_ratio
from performance_trend where type=1 and
(create_time > :stime and create_time < :etime) group by createTime",{"stime":new_starttime,"etime":new_endtime}).fetchall()
七、內建func函数
- time类
- 对查询的time字段做格式化输出
func.date_format(PingResult.create_time,'%Y-%m-%d').label('create_time'))
- 对查询的time字段做格式化输出
- math类
- func.sum()
- func.avg()
- func.count()
- func.max()
- func.min()
网友评论