美文网首页程序员
Django3.1异步视图+aiomysql+channels实

Django3.1异步视图+aiomysql+channels实

作者: 梟遙書眚 | 来源:发表于2020-09-24 13:35 被阅读0次

    Django3.1异步视图+aiomysql+channels实现小游戏

    最近有需求需要开发一款网页答题小游戏,实现实时对战的功能,首先想到使用tornado高并发异步框架去实现websocket,可是就是这个时候django3.1正式版发布了,说他来的早不如说他来得巧,既然方便强大的django支持异步视图了那为什么还要去花时间研究tornado,django3.x实现asgi接口自然可以实现websocket,但是考虑开发成本,最终还是选择使用channels实现websocket。考虑到公司业务,这里就只写websocket的实现吧。

    项目依赖的主要环境

    1. python >= 3.7
    2. django >= 3.1
    3. channels >= 2.4
    4. aiomysql
    5. uvicorn

    channels配置

    1. 假设python和django环境已经就绪且项目已经初始化

    2. 安装channels

    pip install channels

    3. 在app中注册channels

    # settings.py
    INSTALLED_APPS = (
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.sites',
        ...
        'channels',
    )
    

    4. 创建路由配置

    在工程目录下创建routing.py(myproject/routing.py和settings.py同级)

    # myproject/routing.py
    from channels.auth import AuthMiddlewareStack
    # 继承settings中的allow host
    from channels.security.websocket import AllowedHostsOriginValidator
    from channels.routing import ProtocolTypeRouter, URLRouter
    
    application = ProtocolTypeRouter({
        # (http->django views is added by default)
        'websocket': AllowedHostsOriginValidator(
            AuthMiddlewareStack(  # 中间件
                URLRouter(
                # 这里配置websocket的路由
                )
            ),
        )
    })
    

    5. 配置路由指向并且启用通道层

    通道层可以理解为为每一个ws连接做一个唯一映射存在全局,这样可以在django的任何地方使用通道层给指定的连接发送消息
    使用通道层需要安装channels_redis:pip install channels_redis

    # settings.py
    ASGI_APPLICATION = 'myproject.routing.application'
    CHANNEL_LAYERS = {
        'default': {
            'BACKEND': 'channels_redis.core.RedisChannelLayer',
            'CONFIG': {
                "hosts": [('127.0.0.1', 6379)],
            },
        },
    }
    

    6. 创建websocket的app

    1. 注册app
    # settings.py
    INSTALLED_APPS = (
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.sites',
        ...
        'app',
        'channels',
    )
    
    2. 在app中创建消费者文件(consumers.py)
    # app/consumers.py
    import json
    from channels.generic.websocket import AsyncWebsocketConsumer
    
    class ChatConsumer(AsyncWebsocketConsumer):
        
        # 建立连接的回调
        async def connect(self):
            self.room_name = self.scope['url_route']['kwargs']['room_id']
            self.room_group_name = 'chat_%s' % self.room_id
    
            # Join room group
            await self.channel_layer.group_add(
                self.room_group_name,
                self.channel_name
            )
    
            await self.accept()
    
        # 断开连接的回调
        async def disconnect(self, close_code):
            # Leave room group
            await self.channel_layer.group_discard(
                self.room_group_name,
                self.channel_name
            )
    
        # 收到消息的回调
        async def receive(self, text_data):
            text_data_json = json.loads(text_data)
            message = text_data_json['message']
    
            # Send message to room group
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'chat_message',
                    'message': message
                }
            )
    
        # 发送消息指定的处理方法
        async def chat_message(self, event):
            message = event['message']
    
            # Send message to WebSocket
            await self.send(text_data=json.dumps({
                'message': message
            }))
    
    3. 在app中创建websocket路由文件(routing.py)
    1.创建路由文件
    # app/routing.py
    from django.urls import re_path
    
    from answer_game import consumers
    
    websocket_urlpatterns = [
        re_path(r'ws/game/controller/(?P<room_id>\w+)/(?P<user_id>\d+)$', consumers.GameController),
    ]
    
    2. 创建自定义中间件
    class WebSocketAuthMiddleware:
        def __init__(self, inner):
            # 存储通过的ASGI应用程序
            self.inner = inner
    
        def __call__(self, scope):
    
            # 关闭旧的数据库连接,以防止使用超时的连接
            close_old_connections()
    
            # 自定义校验逻辑,获取子协议内容
            protocol = dict(scope['headers']).get(b'sec-websocket-protocol', b'').decode()
            # 处理子协议
            # 这里塞进scope的值可以在消费者方法中直接获取
            # 关于scope的更多信息可以参考官方文档 https://channels.readthedocs.io/en/latest/topics/consumers.html?highlight=scope#scope
            accept = False if not protocol else True
            
            # 直接返回内部应用程序并让它继续运行
            return self.inner(dict(scope, accept=accept))
    
    2. 注册路由,使用自定义中间件
    # myproject/routing.py
    # 这里是自定义中间件,也可以换成自带的auth中间件,因为本次使用了websocket子协议protocol,所以使用自定义中间件做安全校验
    from costudy_answer_game.WebSocketAuthMiddleware import WebSocketAuthMiddleware
    # 这里就是自带的auth中间件,没有用到
    from channels.auth import AuthMiddlewareStack
    # 继承settings中的allow host
    from channels.security.websocket import AllowedHostsOriginValidator
    from channels.routing import ProtocolTypeRouter, URLRouter
    from app import routing
    
    application = ProtocolTypeRouter({
        # (http->django views is added by default)
        'websocket': AllowedHostsOriginValidator(
            WebSocketAuthMiddleware(
                URLRouter(
                    # 这里配置websocket的路由
                    routing.websocket_urlpatterns
                )
            ),
        )
    })
    

    7. 修改消费者方法接受子协议

    # app/consumers.ChatConsumer.connect
    await self.accept('your_protocol')
    

    8. 创建asgi文件

    在工程目录下创建asgi.py

    # myproject/asgi.py
    import os
    import django
    
    from django.core.asgi import get_asgi_application
    from channels.routing import get_default_application
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
    
    application = get_asgi_application()
    
    django.setup()
    ws_application = get_default_application()
    

    到这里websocket的配置就已经完成里


    使用aiomysql

    django3.1目前还不支持异步orm,虽然官方文档说明可以使用sync_to_async转换同步方法为异步方法从而使用同步orm,但是实际开发中会遇到很多复杂的聚合查询等,此时再使用这种方法会很麻烦,而且甚至使用不了(是我使用不了,可能是我使用方法不对),所以我选择了aiomysql,aiomysql的官方文档在这 https://aiomysql.readthedocs.io/en/latest/connection.html#

    当然也可以使用异步orm框架tortoise-orm,附上文档连接 https://tortoise-orm.readthedocs.io/en/latest/
    但是由于调研时间问题项目里还是使用了原生sql,而且我始终不愿意在django里使用其他的orm,这是我对django orm的尊重,期望django能更快的支持异步orm

    1.安装aiomysql

    pip install aiomysql

    2. 配置aiomysql

    # settings.py
    class DBcontroller:
        db1_engine = None
        db2_engine = None
        isinstance = False
    
        def __new__(cls, *args, **kwargs):
            if cls.isinstance:  # 如果被实例化了
                return cls.isinstance  # 返回实例化对象
            print('connecting to database...')
            loop = asyncio.get_event_loop()
            asyncio.run_coroutine_threadsafe(DBcontroller.connect(), loop)
            cls.isinstance = object.__new__(cls)  # 否则实例化
            return cls.isinstance  # 返回实例化的对象
    
        @staticmethod
        async def connect():
            try:
                db1_aiomysql_config = dict(
                    host=os.environ.get('MYSQLHOST', '127.0.0.1'),
                    port=int(os.environ.get('MYSQLPORT', 3306)),
                    user=os.environ.get('MYSQLUSER', 'root'),
                    password=os.environ.get('MYSQLPWD'),
                    maxsize=100,
                    db='dbname1',
                    # echo=True
                )
                db2_aiomysql_config = db1_aiomysql_config.copy()
                db2_aiomysql_config.update({'db': 'dbname2'})
    
                db1_engine = await aiomysql.create_pool(**study_aiomysql_config)
                db2_engine = await aiomysql.create_pool(**game_aiomysql_config)
                if db1_engine and db2_engine:
                    DBcontroller.db1_engine = db1_engine
                    DBcontroller.db2_engine = db2_engine
                    DBcontroller.connectStatue = True
                    print('connect to mysql success!')
                else:
                    raise ("connect to mysql error ")
            except:
                print('connect error.', exc_info=True)
    
    db = DBcontroller()
    

    3. 使用

    # app/views.py
    from django.shortcuts import  HttpResponse
    async def get_user_id_by_uid(request):
        uid = request.GET.get('uid')
        async with await db.study_engine.acquire() as coon:  # type: aiomysql.connection.Connection
            async with coon.cursor() as cur:  # type: aiomysql.cursors.Cursor
                sql = "SELECT nickname FROM user WHERE id=%s"
                await cur.execute(sql, uid)
                rel = await cur.fetchone()
                if rel:
                    user_id, = rel
                else:
                    user_id = 0
        db.study_engine.release(coon)
        rel = {"user_id":user_id}
        return HttpResponse(json.dumps(rel, ensure_ascii=False), content_type='application/json')
    

    4. 封装使用

    每一次执行sql都要写这么多代码,太麻烦了,那就封装一下吧
    创建公共方法文件utils.py

    class DBExecute(object):
        def __init__(self, sql: str, params=None, return_type='tuple'):
            """
            执行study数据库的sql
            :param sql: sql语句
            :param params: sql的参数
            :param return_type: 返回的数据类型
            """
            self.sql = sql
            self.params = params
            self.return_type = return_type
    
        async def fetchone(self) -> (tuple, dict):
            async with await db.study_engine.acquire() as conn:  # type: aiomysql.connection.Connection
                if self.return_type == 'dict':
                    async with conn.cursor(aiomysql.DictCursor) as cur:
                        if self.params is None:
                            await cur.execute(self.sql)
                        else:
                            await cur.execute(self.sql, self.params)
                        rel = await cur.fetchone()  # type: dict
                else:
                    async with conn.cursor() as cur:  # type: aiomysql.cursors.Cursor
                        if self.params is None:
                            await cur.execute(self.sql)
                        else:
                            await cur.execute(self.sql, self.params)
                        rel = await cur.fetchone()  # type: tuple
            db.study_engine.release(conn)
            return rel
    
        async def fetchall(self) -> list:
            async with await db.study_engine.acquire() as conn:  # type: aiomysql.connection.Connection
                if self.return_type == 'dict':
                    async with conn.cursor(aiomysql.DictCursor) as cur:  # type: aiomysql.cursors.Cursor
                        if self.params is None:
                            await cur.execute(self.sql)
                        else:
                            await cur.execute(self.sql, self.params)
                        rel = await cur.fetchall()
                else:
                    async with conn.cursor() as cur:  # type: aiomysql.cursors.Cursor
                        if self.params is None:
                            await cur.execute(self.sql)
                        else:
                            await cur.execute(self.sql, self.params)
                        rel = await cur.fetchall()
            db.study_engine.release(conn)
            return rel
    

    这里只封装了一个db的执行方法,在settings.py中配置了两个db,这个是根据业务调整的
    现在再看一下刚刚的视图函数可以怎么写

    # app/views.py
    from django.shortcuts import  HttpResponse
    from common.utils import db
    async def get_user_id_by_uid(request):
        uid = request.GET.get('uid')
        sql = "SELECT nickname FROM user WHERE id=%s"
        rel = await DBExecute(sql, uid).fetchone()
        if rel:
            user_id, = rel
        else:
            user_id = 0
        rel = {"user_id":user_id}
        return HttpResponse(json.dumps(rel, ensure_ascii=False), content_type='application/json')
    

    启动项目

    uvicorn --host 127.0.0.1 --port 8000 --workers 1 myproject.asgi:ws_application

    总结

    算了,不会总结,有问题就谷歌

    相关文章

      网友评论

        本文标题:Django3.1异步视图+aiomysql+channels实

        本文链接:https://www.haomeiwen.com/subject/xlbzyktx.html