美文网首页
python 后台应用项目搭建(aiohtpp)

python 后台应用项目搭建(aiohtpp)

作者: BrianHsu | 来源:发表于2020-03-21 14:22 被阅读0次

    通过本文章,可能获得以下能力

    • python 异步关键词 async 的用法
    • python aiphttp 异步框架
    • sqlarchemy 对数据的支持
    • python mysql 异步操作

    阅读概览

    环境
    依赖
    项目结构
    项目主要内容

    环境

          python 3.6 or highter
          mysql  5.2.5 (未使用其他版本测试,感兴趣的话可以试着更换 mysql 版本,查看其兼容性)
          设备 Mac 
          开发工具  pycharm
    

    依赖包

         -   aiomysql==0.0.20
         -   ujson==1.35
         -   SQLAlchemy==1.3.8
         -   shortuuid==0.5.0
         -   aiohttp==3.5.4
         -   PyYAML==5.3.1
    

    包安装

        pip install aiohttp
        pip install aiomysql
        pip install sqlalchemy
        pip install ujson
        pip install PyYAML
        pip install shortuuid
    

    项目目录

    根目录为配置文件及源码

    .
    ├── README.md
    ├── config 
    │   ├── __init__.py
    │   ├── dev.yml
    │   └── setting.py
    ├── requirements.txt
    └── src
        ├── __init__.py
        ├── common
        │   ├── __init__.py
        │   └── mysql.py
        ├── constants
        │   ├── __init__.py
        │   └── const.py
        ├── model
        │   ├── __init__.py
        │   ├── base_option.py
        │   └── student.py
        ├── routes.py
        ├── server.py
        ├── service
        │   ├── __init__.py
        │   └── student_svc.py
        ├── utils
        │   ├── __init__.py
        │   ├── request_utils.py
        │   ├── response_utils.py
        │   └── time_utils.py
        └── views
            ├── __init__.py
            ├── status.py
            └── student_api.py
    
    
    

    项目内容

    • 启动入口 server.py
    # server.py
    from aiohttp import web
    
    from config.setting import config
    from src import create_app
    
    app = create_app()
    
    if __name__ == "__main__":
        web.run_app(app, host='127.0.0.1', port=config['base']['port'])
    
    • 初始化 init.py(与server.py目录同级)

    用于初始化路由、数据库、日志、配置信息等操作

    import asyncio
    from aiohttp import web
    
    # 版本号
    from src.common.mysql import Mysql
    from config.setting import config
    from src.routes import set_up
    from src.utils.request_utils import request_id
    from src.utils.response_utils import success, error
    
    __version__ = '1.0.0'
    
    
    def create_app():
        # 增加响应处理
        my_app = web.Application(middlewares=[middleware])
        my_app['config'] = config
        # 初始化mysql
        asyncio.ensure_future(Mysql.init_engine())
        # 初始化路由
        set_up(my_app)
        return my_app
    
    
    @web.middleware
    async def middleware(request, handler):
        _request_id = request_id(request)
        try:
            request['request_id'] = _request_id
            data = await handler(request)
            if data:
                resp = success(_request_id, data)
            else:
                resp = success(_request_id)
        except ValueError as e:
            resp = error(_request_id, "RC_INVALID_PARAM", e.args[0] if e.args else 'params invalid')
        except Exception as e1:
            resp = error(_request_id, "RC_INTERNAL_ERROR", e1.args[0] if e1.args else 'inner error')
        return web.Response(body=resp.encode(), content_type='application/json')
    
    
    • 访问api 初始化 student_api.py
    from src.views.student_api import students, student, save_student
    from .views.status import status
    
    
    def set_up(app):
        # 项目状态
        app.router.add_get('/status', status, name='status')
    
        # 获取全部学生信息
        app.router.add_get('/students', students, name='students')
    
        # 获取单个学生信息
        app.router.add_get('/student', student, name='student')
    
        # 获取单个学生信息
        app.router.add_post('/save', save_student, name='save_student')
    
    
    • api业务处理
    import asyncio
    
    from src.service import student_svc
    
    
    # 使用asyncio.shield 防止请求过程取消请求造成数据库阻塞
    
    async def students(request):
        return await asyncio.shield(student_svc.students())
    
    
    async def save_student(request):
        data = await request.json()
        return await asyncio.shield(student_svc.save_student(data))
    
    
    async def student(request):
        st_id = request.match_info.get("id")
        return await asyncio.shield(student_svc.single_student(st_id))
    
    
    • 数据库链接配置
    
    import aiomysql.sa
    
    from config.setting import config
    
    mysql_config = config["mysql"]
    
    
    class Mysql(object):
        engine = None
    
        def __init__(self, engine):
            self.engine = engine
    
        @staticmethod
        async def init_engine():
            Mysql.engine = await aiomysql.sa.create_engine(user=mysql_config["user"], db=mysql_config["database"],
                                                           host=mysql_config["host"], password=mysql_config["password"],
                                                           autocommit=True, connect_timeout=10, echo=False)
    
        @staticmethod
        async def get_engine():
            if not Mysql.engine:
                await Mysql.init_engine()
            return Mysql.engine
    
    
    • 数据库操作基本配置
    from src import Mysql
    
    
    class BaseOption:
    
        @classmethod
        async def insert_or_update(cls, *sqls):
            try:
                engine = await Mysql.get_engine()
                async with engine.acquire() as conn:
                    await conn.connection.autocommit(False)
                    trans = None
                    try:
                        trans = await conn.begin()
                        for sql in sqls:
                            await conn.execute(sql)
                        await trans.commit()
                        return "success"
                    except Exception as t:
                        if trans:
                            await trans.rollback()
                        raise Exception(t)
                    finally:
                        await conn.connection.autocommit(True)
                        conn.connection.close()
            except Exception as e:
                raise Exception(e)
    
        @classmethod
        async def query(cls, sql):
            try:
                engine = await Mysql.get_engine()
                async with engine.acquire() as conn:
                    try:
                        return await conn.execute(sql)
                    except Exception as exec_err:
                        raise Exception(exec_err)
                    finally:
                        conn.connection.close()
            except Exception as e:
                raise Exception(e)
    
    
    
    • 数据库操作
    from sqlalchemy import BigInteger, SMALLINT, Integer
    from sqlalchemy import String
    from sqlalchemy import Column
    from sqlalchemy import Table
    from sqlalchemy import MetaData
    
    from src.constants import const
    from src.model.base_option import BaseOption
    from src.utils.time_utils import current_sec_time
    
    
    class Student(BaseOption):
    
        def __init__(self):
            meta = MetaData()
            self.student_tb = Table(
                "student",
                meta,
                Column(const.Student.ID, Integer, primary_key=True),
                Column(const.Student.NAME, String(40)),
                Column(const.Student.CLASS, String(40)),
                Column(const.Student.STATUS, SMALLINT, default=1),
                Column(const.Student.CREATE_TIME, BigInteger, default=current_sec_time()),
                Column(const.Student.UPDATE_TIME, BigInteger, default=current_sec_time())
            )
    
        async def insert_student(self, student_obj):
            sql = self.student_tb.insert().values(
                name=student_obj.get("name"),
                st_class=student_obj.get("st_class"),
                status=student_obj.get("status", 1),
                create_time=current_sec_time(),
                update_time=current_sec_time())
            return await self.insert_or_update(sql)
    
        async def get_update_student_sql(self, student_obj):
            sql = self.student_tb.update().values(
                name=student_obj.get("name"),
                st_class=student_obj.get("st_class"),
                status=student_obj.get("status"),
                create_time=current_sec_time(),
                update_time=current_sec_time()
            ).where(self.student_tb.c.id == student_obj.get(const.Student.ID))
            return self.query(sql)
    
        async def status(self, st_id):
            try:
                sql = self.student_tb.delete().where(self.student_tb.c.id == st_id)
                return await self.insert_or_update(sql)
            except Exception as e:
                raise Exception('update status failed, error info:{}.'.format(e))
    
        async def select_student_by_st_id(self, st_id):
            try:
                sql = self.student_tb.select().where(self.student_tb.c.id == st_id)
                cursor = await self.query(sql)
                records = await cursor.fetchall()
                return [dict(r) for r in records] if records else None
            except Exception as e:
                raise Exception('select_by_id failed, error info:{}.'.format(e))
    
        async def select_students(self):
            try:
                sql = self.student_tb.select()
                print(sql)
                cursor = await self.query(sql)
                records = await cursor.fetchall()
                return [dict(r) for r in records] if records else None
            except Exception as e:
                raise Exception('select_by_id failed, error info:{}.'.format(e))
    
    

    具体代码移步github aiohttp-example

    相关文章

      网友评论

          本文标题:python 后台应用项目搭建(aiohtpp)

          本文链接:https://www.haomeiwen.com/subject/tkfyyhtx.html