美文网首页
django中使用websocket

django中使用websocket

作者: 早安明天_8731 | 来源:发表于2022-01-26 14:38 被阅读0次

    第一步:

    安装所需的pip包:channels,channels-redis

    第二步:

    修改settings.py添加相应配置,INSTALLED_APPS中加入channels,并加入CHANNEL_LAYERS

    # channles
    CHANNEL_LAYERS = {
        'default': {
            'BACKEND': 'channels_redis.core.RedisChannelLayer',
            'CONFIG': {
                "hosts": [(REDIS_HOST, REDIS_PORT)],
                "symmetric_encryption_keys": [SECRET_KEY],
                "prefix": 'cybercity:asgi',
            },
        },
    }
    

    第三步修改asgi的入口文件如下

    import os
    
    from django.core.asgi import get_asgi_application
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'x.settings')
    
    django_asgi_app = get_asgi_application()
    
    application = ProtocolTypeRouter({
        'websocket': SessionMiddlewareStack(
            URLRouter(
                practice.routing.websocket_urlpatterns,
            ),
        ),
        "http": django_asgi_app,
    })
    

    第四步添加consumer.py文件,放到对应的app里

    (1)consumer.py的内容如下所示:

    from channels.generic.websocket import AsyncJsonWebsocketConsumer
    from practice import status
    from practice.exceptions import ClientError
    
    
    class NoticeConsumer(AsyncJsonWebsocketConsumer):
        """ 考试事件消息推送 """
    
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            # 定义消息推送房间
            self.room = "notice"
            self.is_closed = False
    
        async def connect(self):
            """
            Called when the websocket is handshaking as part of initial connection.
            """
            # self.room_name = self.scope["url_route"]["kwargs"]["login_id"]
            await self.accept()
    
        async def receive_json(self, content, **kwargs):
            """
            Called when we get a text frame. Channels will JSON-decode the payload
            for us and pass it as the first argument.
            """
            try:
                command = content.get("command", None)
                if command is None:
                    raise ClientError("COMMAND_IS_ACCESSARY")
                if command == "join":
                    await self.join_room()
                elif command == "leave":
                    await self.leave_room()
                else:
                    raise ClientError("COMMAND_ERROR")
    
            except ClientError as e:
                await self.send_json({"error": e.code})
    
        async def disconnect(self, code):
            """
            Called when the WebSocket closes for any reason.
            """
            self.is_closed = True
            try:
                await self.leave_room()
            except ClientError:
                pass
    
        async def join_room(self):
            """
            Called by receive_json when someone sent a join command.
            """
            if status.NOTIFY_USERS_ON_ENTER_OR_LEAVE_ROOMS:
                # 开启通知
                await self.channel_layer.group_send(self.room, {
                    "type": "notice.join",
                    "room": self.room
                })
            # 加入消息组
    
            await self.channel_layer.group_add(
                self.room,
                self.channel_name,
            )
            # 给连接的ws客户端发送确认消息
            await self.send_json({
                "join": self.room
            })
    
        async def leave_room(self):
            """
            Called by receive_json when someone sent a leave command.
            """
            if status.NOTIFY_USERS_ON_ENTER_OR_LEAVE_ROOMS:
                await self.channel_layer.group_send(
                    self.room, {
                        "type": "notice.leave",
                        "room": self.room
                    })
            # 离开消息组
            await self.channel_layer.group_discard(
                self.room,
                self.channel_name,
            )
            # 给连接的ws客户端发送确认消息
            # 如果ws客户端连接断开则不发送确认消息
            if not self.is_closed:
                await self.send_json({
                    "leave": self.room,
                })
    
        async def notice_join(self, event):
            """
            Called when someone has joined our chat.
            """
            await self.send_json(
                {
                    "msg_type": status.MSG_TYPE_ENTER,
                    "room": event["room"]
                }, )
    
        async def notice_leave(self, event):
            """
            Called when someone has left our chat.
            """
            await self.send_json(
                {
                    "msg_type": status.MSG_TYPE_LEAVE,
                    "room": event["room"],
                }, )
    
        async def notice_message(self, event):
            """
            Called when someone has messaged our chat.
            """
            await self.send_json(
                {
                    "msg_type": status.MSG_TYPE_MESSAGE,
                    "room": self.room,
                    "message": event["message"],
                    "created": event["created"],
                    "topic": event["topic"],
                }, )
    

    (2)status.py的内容如下所示:

    NOTIFY_USERS_ON_ENTER_OR_LEAVE_ROOMS = True
    
    MSG_TYPE_MESSAGE = 0  # For standard messages
    MSG_TYPE_WARNING = 1  # For yellow messages
    MSG_TYPE_ALERT = 2  # For red & dangerous alerts
    MSG_TYPE_MUTED = 3  # For just OK information that doesn't bother users
    MSG_TYPE_ENTER = 4  # For just OK information that doesn't bother users
    MSG_TYPE_LEAVE = 5  # For just OK information that doesn't bother users
    
    MESSAGE_TYPES_CHOICES = (
        (MSG_TYPE_MESSAGE, 'MESSAGE'),
        (MSG_TYPE_WARNING, 'WARNING'),
        (MSG_TYPE_ALERT, 'ALERT'),
        (MSG_TYPE_MUTED, 'MUTED'),
        (MSG_TYPE_ENTER, 'ENTER'),
        (MSG_TYPE_LEAVE, 'LEAVE'),
    )
    
    MESSAGE_TYPES_LIST = [
        MSG_TYPE_MESSAGE,
        MSG_TYPE_WARNING,
        MSG_TYPE_ALERT,
        MSG_TYPE_MUTED,
        MSG_TYPE_ENTER,
        MSG_TYPE_LEAVE,
    ]
    

    (3)exceptions.py的内容如下所示:

    class BaseError(Exception):
        """ 自定义异常基类 """
        def __init__(self, code):
            super().__init__(code)
            self.code = code
    
    
    class ClientError(BaseError):
    

    第五步添加routing.py文件,即websocket接口路由相关

    from django.urls import path
    
    from practice.consumers import NoticeConsumer, BroadcastConsumer
    
    websocket_urlpatterns = [
        path("stream/notice/", NoticeConsumer.as_asgi()),
        path("stream/broadcast/", BroadcastConsumer.as_asgi()),
    ]
    

    推送消息

    import os
    import json
    import datetime
    
    from asgiref.sync import async_to_sync
    from channels.layers import get_channel_layer
    from django_redis import get_redis_connection
    
    
    channel_layer = get_channel_layer()
    redis_conn = get_redis_connection("default")
    
    
    def vulnerability_approved(message):
        """管理员漏洞审核通过信息展示界面推送通知"""
        async_to_sync(channel_layer.group_send)(
            "broadcast", {
                "type": "notice.message",
                "message": message,
                "created": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "topic": "vulnerability_approved"
            })
    
    

    相关文章

      网友评论

          本文标题:django中使用websocket

          本文链接:https://www.haomeiwen.com/subject/ovsnhrtx.html