通过本文章,可能获得以下能力
- python 异步关键词 async 的用法
- python aiphttp 异步框架
- sqlarchemy 对数据的支持
- python mysql 异步操作
阅读概览
环境
依赖
项目结构
项目主要内容
环境
python 3.6 or highter
mysql 5.2.5 (未使用其他版本测试,感兴趣的话可以试着更换 mysql 版本,查看其兼容性)
设备 Mac
开发工具 pycharm
依赖包
- aiomysql==0.0.20
- ujson==1.35
- SQLAlchemy==1.3.8
- shortuuid==0.5.0
- aiohttp==3.5.4
- PyYAML==5.3.1
包安装
pip install aiohttp
pip install aiomysql
pip install sqlalchemy
pip install ujson
pip install PyYAML
pip install shortuuid
项目目录
根目录为配置文件及源码
.
├── README.md
├── config
│ ├── __init__.py
│ ├── dev.yml
│ └── setting.py
├── requirements.txt
└── src
├── __init__.py
├── common
│ ├── __init__.py
│ └── mysql.py
├── constants
│ ├── __init__.py
│ └── const.py
├── model
│ ├── __init__.py
│ ├── base_option.py
│ └── student.py
├── routes.py
├── server.py
├── service
│ ├── __init__.py
│ └── student_svc.py
├── utils
│ ├── __init__.py
│ ├── request_utils.py
│ ├── response_utils.py
│ └── time_utils.py
└── views
├── __init__.py
├── status.py
└── student_api.py
项目内容
- 启动入口 server.py
# server.py
from aiohttp import web
from config.setting import config
from src import create_app
app = create_app()
if __name__ == "__main__":
web.run_app(app, host='127.0.0.1', port=config['base']['port'])
- 初始化 init.py(与server.py目录同级)
用于初始化路由、数据库、日志、配置信息等操作
import asyncio
from aiohttp import web
# 版本号
from src.common.mysql import Mysql
from config.setting import config
from src.routes import set_up
from src.utils.request_utils import request_id
from src.utils.response_utils import success, error
__version__ = '1.0.0'
def create_app():
# 增加响应处理
my_app = web.Application(middlewares=[middleware])
my_app['config'] = config
# 初始化mysql
asyncio.ensure_future(Mysql.init_engine())
# 初始化路由
set_up(my_app)
return my_app
@web.middleware
async def middleware(request, handler):
_request_id = request_id(request)
try:
request['request_id'] = _request_id
data = await handler(request)
if data:
resp = success(_request_id, data)
else:
resp = success(_request_id)
except ValueError as e:
resp = error(_request_id, "RC_INVALID_PARAM", e.args[0] if e.args else 'params invalid')
except Exception as e1:
resp = error(_request_id, "RC_INTERNAL_ERROR", e1.args[0] if e1.args else 'inner error')
return web.Response(body=resp.encode(), content_type='application/json')
- 访问api 初始化 student_api.py
from src.views.student_api import students, student, save_student
from .views.status import status
def set_up(app):
# 项目状态
app.router.add_get('/status', status, name='status')
# 获取全部学生信息
app.router.add_get('/students', students, name='students')
# 获取单个学生信息
app.router.add_get('/student', student, name='student')
# 获取单个学生信息
app.router.add_post('/save', save_student, name='save_student')
- api业务处理
import asyncio
from src.service import student_svc
# 使用asyncio.shield 防止请求过程取消请求造成数据库阻塞
async def students(request):
return await asyncio.shield(student_svc.students())
async def save_student(request):
data = await request.json()
return await asyncio.shield(student_svc.save_student(data))
async def student(request):
st_id = request.match_info.get("id")
return await asyncio.shield(student_svc.single_student(st_id))
- 数据库链接配置
import aiomysql.sa
from config.setting import config
mysql_config = config["mysql"]
class Mysql(object):
engine = None
def __init__(self, engine):
self.engine = engine
@staticmethod
async def init_engine():
Mysql.engine = await aiomysql.sa.create_engine(user=mysql_config["user"], db=mysql_config["database"],
host=mysql_config["host"], password=mysql_config["password"],
autocommit=True, connect_timeout=10, echo=False)
@staticmethod
async def get_engine():
if not Mysql.engine:
await Mysql.init_engine()
return Mysql.engine
- 数据库操作基本配置
from src import Mysql
class BaseOption:
@classmethod
async def insert_or_update(cls, *sqls):
try:
engine = await Mysql.get_engine()
async with engine.acquire() as conn:
await conn.connection.autocommit(False)
trans = None
try:
trans = await conn.begin()
for sql in sqls:
await conn.execute(sql)
await trans.commit()
return "success"
except Exception as t:
if trans:
await trans.rollback()
raise Exception(t)
finally:
await conn.connection.autocommit(True)
conn.connection.close()
except Exception as e:
raise Exception(e)
@classmethod
async def query(cls, sql):
try:
engine = await Mysql.get_engine()
async with engine.acquire() as conn:
try:
return await conn.execute(sql)
except Exception as exec_err:
raise Exception(exec_err)
finally:
conn.connection.close()
except Exception as e:
raise Exception(e)
- 数据库操作
from sqlalchemy import BigInteger, SMALLINT, Integer
from sqlalchemy import String
from sqlalchemy import Column
from sqlalchemy import Table
from sqlalchemy import MetaData
from src.constants import const
from src.model.base_option import BaseOption
from src.utils.time_utils import current_sec_time
class Student(BaseOption):
def __init__(self):
meta = MetaData()
self.student_tb = Table(
"student",
meta,
Column(const.Student.ID, Integer, primary_key=True),
Column(const.Student.NAME, String(40)),
Column(const.Student.CLASS, String(40)),
Column(const.Student.STATUS, SMALLINT, default=1),
Column(const.Student.CREATE_TIME, BigInteger, default=current_sec_time()),
Column(const.Student.UPDATE_TIME, BigInteger, default=current_sec_time())
)
async def insert_student(self, student_obj):
sql = self.student_tb.insert().values(
name=student_obj.get("name"),
st_class=student_obj.get("st_class"),
status=student_obj.get("status", 1),
create_time=current_sec_time(),
update_time=current_sec_time())
return await self.insert_or_update(sql)
async def get_update_student_sql(self, student_obj):
sql = self.student_tb.update().values(
name=student_obj.get("name"),
st_class=student_obj.get("st_class"),
status=student_obj.get("status"),
create_time=current_sec_time(),
update_time=current_sec_time()
).where(self.student_tb.c.id == student_obj.get(const.Student.ID))
return self.query(sql)
async def status(self, st_id):
try:
sql = self.student_tb.delete().where(self.student_tb.c.id == st_id)
return await self.insert_or_update(sql)
except Exception as e:
raise Exception('update status failed, error info:{}.'.format(e))
async def select_student_by_st_id(self, st_id):
try:
sql = self.student_tb.select().where(self.student_tb.c.id == st_id)
cursor = await self.query(sql)
records = await cursor.fetchall()
return [dict(r) for r in records] if records else None
except Exception as e:
raise Exception('select_by_id failed, error info:{}.'.format(e))
async def select_students(self):
try:
sql = self.student_tb.select()
print(sql)
cursor = await self.query(sql)
records = await cursor.fetchall()
return [dict(r) for r in records] if records else None
except Exception as e:
raise Exception('select_by_id failed, error info:{}.'.format(e))
具体代码移步github aiohttp-example
网友评论