美文网首页
oslo sqlalchemy 使用范例

oslo sqlalchemy 使用范例

作者: 笨手笨脚越 | 来源:发表于2017-07-24 09:31 被阅读198次

    oslo db 是oslo系列工具库里专用于管理数据库连接、管理的工具。它主要对sqlalchemy做进一步封装使用。
    github官网: https://github.com/openstack/oslo.db

    使用:

    一、定义models:

    from sqlalchemy import Column, Integer, String, Float, SmallInteger, ForeignKey
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import backref, column_property, relationship, validates
    
    BASE = declarative_base()
    
    
    class Country(BASE):
        __tablename__ = "country"
        Code = Column(String(3), nullable=False, primary_key=True)
        Name = Column(String(52), nullable=False)
        Continent = Column(String(25), nullable=False)
        Region = Column(String(26), nullable=False)
        SurfaceArea = Column(Float(10, 2), nullable=False)
        IndepYear = Column(SmallInteger, nullable=False)
        Population = Column(Integer, nullable=False)
        LifeExpectancy = Column(Float(3, 1))
        GNP = Column(Float(10, 2))
        GNPOld = Column(Float(10, 2))
        LocalName = Column(String(45), nullable=False)
        GovernmentForm = Column(String(45), nullable=False)
        HeadOfState = Column(String(60))
        Capital = Column(Integer)
        Code2 = Column(String(2), nullable=False)
    
    
    class City(BASE):
        __tablename__ = "city"
        ID = Column(Integer, primary_key=True)
        Name = Column(String(35), nullable=False)
        CountryCode = Column(String(3), ForeignKey('country.Code'), nullable=False)
        District = Column(String(20), nullable=False)
        Population = Column(Integer, nullable=False)
    

    一、常用操作

    # -*- coding:utf-8 -*-
    
    from oslo_db import exception as db_exc
    from oslo_db import options
    from oslo_db.sqlalchemy import session as db_session
    
    import sqlalchemy
    from sqlalchemy import MetaData
    from sqlalchemy import or_, and_, case
    from sqlalchemy.orm import joinedload, joinedload_all, undefer_group
    from sqlalchemy.orm import RelationshipProperty
    from sqlalchemy import sql
    from sqlalchemy.sql.expression import bindparam
    from sqlalchemy.sql.expression import desc
    from sqlalchemy.sql.expression import literal_column
    from sqlalchemy.sql.expression import true
    from sqlalchemy.sql import func
    from sqlalchemy.sql import sqltypes
    from sqlalchemy import text
    
    from models import City, Country
    
    _FACADE = None
    
    
    def _create_facade_lazily():
        """
        加载
        :return: 
        """
        global _FACADE
        if _FACADE is None:
            _FACADE = db_session.EngineFacade(
                'mysql+pymysql://root:wyue@localhost:3306/world?charset=latin1'
            )
    
        return _FACADE
    
    
    def get_engine():
        """
        获得引擎
        :return: 
        """
        facade = _create_facade_lazily()
        return facade.get_engine()
    
    
    def get_session(**kwargs):
        """
        获得会话
        :param kwargs: 
        :return: 
        """
        facade = _create_facade_lazily()
        return facade.get_session(**kwargs)
    
    
    def dispose_engine():
        get_engine().dispose()
    
    
    def print_city_info(city=None, country=None):
        """
        打印信息
        :param city: 
        :param country: 
        :return: 
        """
        info = {}
        if city:
            city_info = {'id': city.ID, 'name': city.Name, 'country_code': city.CountryCode, 'district': city.District,
                         'population': city.Population}
            info.update({'city': city_info})
        if country:
            country_info = {'code': country.Code, 'name': country.Name, 'continent': country.Continent,
                            'region': country.Region,
                            'surfaceArea': country.SurfaceArea, 'IndepYear': country.IndepYear}
            info.update({'country': country_info})
        print info
    
    
    def query_city_all():
        """
        查询所有
        :return:
        """
        session = get_session()
        with session.begin():
            query = session.query(City)
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(c)
    
    
    def query_city_filter1():
        """
        单条件查询
        :return:
        """
        session = get_session()
        with session.begin():
            query = session.query(City).filter(City.Name == 'Gaza')
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(c)
    
    
    def query_city_filter2():
        """
        多条件查询
        :return:
        """
        session = get_session()
        with session.begin():
            condition = {'CountryCode': 'GBR', 'District': 'England'}
            query = session.query(City).filter_by(**condition)
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(c)
    
    
    def query_city_with_AND():
        """
        使用AND运算符
        :return:
        """
        session = get_session()
        with session.begin():
            condition = [City.CountryCode == 'GBR', City.District == 'England', City.Name == 'Birkenhead']
            query = session.query(City).filter(and_(*condition))
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(c)
    
    
    def query_city_with_OR():
        """
        使用OR运算符
        :return:
        """
        session = get_session()
        with session.begin():
            condition = [City.District == 'England', City.Name == 'Birkenhead']
            query = session.query(City).filter(or_(*condition))
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(c)
    
    
    def query_city_with_AND_OR():
        """
        使用OR + AND 运算符
        :return:
        """
        session = get_session()
        with session.begin():
            query = session.query(City).filter(and_(
                City.CountryCode == 'GBR',
                or_(
                    City.Name == 'Maidstone',
                    City.Population == 91069
                )
            ))
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(c)
    
    
    def query_city_with_IN():
        """
        使用 IN 运算符
        :return:
        """
        session = get_session()
        with session.begin():
            ids = [1, 2, 3, 5, 6, 8]
            query = session.query(City).filter(City.ID.in_(ids))
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(c)
    
    
    def query_city_with_LIKE():
        """
        使用 like 运算符模糊查询
        :return:
        """
        session = get_session()
        with session.begin():
            query = session.query(City).filter(City.Name.like('He%'))
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(c)
    
    
    def query_city_with_union():
        """
        使用 union 集合查询
        :return:
        """
        session = get_session()
        with session.begin():
            query1 = session.query(City).filter(City.Name.like('He%'))
            ids = [1, 2, 3, 5, 6, 8]
            query2 = session.query(City).filter(City.ID.in_(ids))
            query = query1.union(query2)
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(c)
    
    
    def query_city_with_exists():
        """
        使用 exist 集合查询
    
        :return:
        """
        session = get_session()
        with session.begin():
            query = session.query(City).filter(sql.exists().where(and_(
                City.CountryCode == Country.Code,
                Country.Name == 'Aruba'
            )))
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(c)
    
    
    def query_city_country_join():
        """
        连接查询
        :return:
        """
        session = get_session()
        with session.begin():
            query = session.query(City.Name, Country.Name).join(Country).filter(Country.Name == 'Aruba')
            print 'SQL : %s' % str(query)
            print query.first()
            # for c in query.all():
            #     print_city_info(c)
    
    
    def query_city_group_by():
        """
        group by 、sum求和、count求总数
        :return:
        """
        session = get_session()
        with session.begin():
            query = session.query(City.CountryCode, func.count(City.ID), func.sum(City.Population)). \
                group_by(City.CountryCode). \
                having(and_(
                func.count(City.ID) > 10,
                func.sum(City.Population) > 85699060))
            print 'SQL : %s' % str(query)
            print query.all()
    
    
    def query_city_sub_query():
        """
        子查询
    
        SQL : SELECT country."Code" AS "country_Code", country."Name" AS "country_Name", country."Continent" AS "country_Continent", country."Region" AS "country_Region", country."SurfaceArea" AS "country_SurfaceArea", country."IndepYear" AS "country_IndepYear", country."Population" AS "country_Population", country."LifeExpectancy" AS "country_LifeExpectancy", country."GNP" AS "country_GNP", country."GNPOld" AS "country_GNPOld", country."LocalName" AS "country_LocalName", country."GovernmentForm" AS "country_GovernmentForm", country."HeadOfState" AS "country_HeadOfState", country."Capital" AS "country_Capital", country."Code2" AS "country_Code2"
    FROM country
    WHERE country."Code" IN (SELECT city."CountryCode"
    FROM city GROUP BY city."CountryCode"
    HAVING count(city."ID") > :count_1 AND sum(city."Population") > :sum_1)
        :return:
        """
        session = get_session()
        with session.begin():
            stmt = session.query(City.CountryCode). \
                group_by(City.CountryCode). \
                having(and_(
                func.count(City.ID) > 10,
                func.sum(City.Population) > 85699060)).subquery()
            query = session.query(Country).filter(Country.Code.in_(stmt))
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(country=c)
    
    
    def query_city_with_sql():
        """
        执行sql
        :return:
        """
        session = get_session()
        with session.begin():
            qrySql = "select CountryCode as country_code, count(ID) cnt from city group by CountryCode having count(ID) > 20"
            query = session.query('country_code', 'cnt').from_statement(text(qrySql))
            print 'SQL : %s' % str(query)
            rows = query.all()
            for row in rows:
                print 'country_code : %s , cnt : %s ' % row
    
    
    def query_city_order_limit():
        """
        排序、分页
        :return:
        """
        session = get_session()
        with session.begin():
            query = session.query(City).order_by(City.ID).limit(10).offset(5)
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(c)
    
    
    def update_city():
        """
        修改信息
        :return:
        """
        session = get_session()
        with session.begin():
            print 'before update:'
            query = session.query(City).filter(City.ID == 1)
            print 'SQL : %s' % str(query)
            print_city_info(query.first())
    
            print '1  after update:'
            query.update({City.Population: 4022222})
            # 使用update方法
            print_city_info(query.first())
    
            print '2  after update:'
            # 使用赋值法
            c = query.first()
            c.Population = 10
            print_city_info(query.first())
    
    
    def add_city():
        """
        修改信息
        :return:
        """
        session = get_session()
        with session.begin():
            c = City()
            c.Name = 'SanMing2'
            c.CountryCode = 'CHN'
            c.District = 'Fujian'
            c.Population = 160691
            session.add(c)
    
            query = session.query(City).filter(City.Name.like('SanMing%'))
            for c in query.all():
                print_city_info(c)
    
    
    def delete_city():
        """
        删除数据
        :return:
        """
        session = get_session()
        with session.begin():
            query = session.query(City).filter(City.Name == 'SanMing2').delete()
    
            query = session.query(City).filter(City.Name.like('SanMing%'))
            for c in query.all():
                print_city_info(c)
    
    
    def delete_city_in():
        """
        用 in 作条件 删除数据
        :return:
        """
    
        session = get_session()
        with session.begin():
            print 'before delete'
            query1 = session.query(City.ID).filter(City.Name.like('SanMin%'))
            print query1.all()
            # 删除记录时,默认会尝试删除 session 中符合条件的对象,而 in 操作估计还不支持,于是就出错了。
            # 解决办法就是删除时不进行同步,然后再让 session 里的所有实体都过期
            # 此外,update 操作也有同样的参数,如果后面立刻提交了,那么加上 synchronize_session=False 参数会更快。
            session.query(City).filter(City.ID.in_(range(4086, 4096))).delete(synchronize_session=False)
    
    def query_city_with_IN():
        """
        使用 between 运算符, between 8 and 10
        SQL : SELECT city."ID" AS "city_ID", city."Name" AS "city_Name", city."CountryCode" AS "city_CountryCode", city."District" AS "city_District", city."Population" AS "city_Population" 
    FROM city 
    WHERE city."ID" BETWEEN :ID_1 AND :ID_2
        :return:
        """
        session = get_session()
        with session.begin():
            query = session.query(City).filter(City.ID.between(8,10))
            print 'SQL : %s' % str(query)
            for c in query.all():
                print_city_info(c)
    
    
    if __name__ == '__main__':
        delete_city_in()
    
    
    

    二、遇到的错误:

    错误日志:

    oslo_db.exception.DBError: (pymysql.err.InternalError) (1093, u"You can't specify target table 'city' for update in FROM clause")
    [SQL: u'DELETE FROM city WHERE city.`ID` IN (SELECT city.`ID` \nFROM city \nWHERE city.`Name` = %(Name_1)s)'] [parameters: {u'Name_1': 'SanMing2'}]
    

    出错的代码:

        session = get_session()
        with session.begin():
            stmt = session.query(City.ID).filter(City.Name == 'SanMing2').subquery()
            session.query(City).filter(City.ID.in_(stmt)).delete(synchronize_session=False)
    

    原因:

    mysql中You can't specify target table for update in FROM clause错误的意思是说,不能先select出同一表中的某些值,再在同一语句中修改或删除这些值!

    相关文章

      网友评论

          本文标题:oslo sqlalchemy 使用范例

          本文链接:https://www.haomeiwen.com/subject/dqwvkxtx.html