Consumers

作者: xncode | 来源:发表于2019-06-26 14:50 被阅读0次

    不用写事件循环,所设置的函数会在相应的事件发生时被调用
    方便写异步或同步代码

    基础

    consumer包含了一系列的对应消息类型的函数,把其中的 . 和 _ 相互转换

    AsyncConsumer

    def __init__(self, scope):
        self.scope = scope
    
    async def __call__(self, receive, send):
    

    调用时,设置channel_layer,调用channel_layer的new_channel方法构造并设置channel_name,传入channel_name参数构建channel_receive

    对于本类 异步 则send即为参数中传入的send

    监听:

                await await_many_dispatch(
                    [receive, self.channel_receive], self.dispatch
                )
    

                # 针对没有channel_layer的
                await await_many_dispatch([receive], self.dispatch)
    

    在dispatch函数中根据message["type"]找到类中相应的处理函数

    SyncConsumer

    相比异步的:
    send函数用async_to_sync转换了
    dispatch加了database_sync_to_async转换

    通用类

    AsyncHttpConsumer

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.body = []
    

    使用http_request处理http.request事件,在接收了完整的body后调用handle

    async def http_request(self, message):
        if "body" in message:
            self.body.append(message["body"])
        if not message.get("more_body"):
            try:
                await self.handle(b"".join(self.body))
            finally:
                await self.disconnect()
                raise StopConsumer()
    

    另有http_disconnect处理http.disconnect事件

    如想在连接关闭之际做出处理,可在disconnect函数中实现

    关于发送响应,提供了send_response函数,
    在其中调用send_headers,发送http.response.start消息
    在其中调用send_body,发送http.response.body消息

    AsyncWebsocketConsumer

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if self.groups is None:
            self.groups = []
    

    实现了websocket_connect
    被调用时如果设置了groups会调用channel_layer.group_add
    之后调用connect,调用accept

    实现了websocket_receive
    根据message中具体内容,调用receive,传入message["text"]或message["bytes"]

    实现了websocket_disconnect
    被调用时如果设置了groups会调用channel_layer.group_discard
    之后调用disconnect

    实现了send,用于发送消息

    如果想自定义函数,可重载connect accept receive close

    WebsocketConsumer

    和上述异步类似,注意写法转换

    关于异步 async

    def await_many_dispatch(consumer_callables, dispatch):
    """
    Given a set of consumer callables, awaits on them all and passes results
    from them to the dispatch awaitable as they come in.
    """
    # Start them all off as tasks
    loop = asyncio.get_event_loop()
    tasks = [
    loop.create_task(consumer_callable())
    for consumer_callable in consumer_callables
    ]
    try:
    while True:
    # Wait for any of them to complete
    await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    # Find the completed one(s), yield results, and replace them
    for i, task in enumerate(tasks):
    if task.done():
    result = task.result()
    await dispatch(result)
    tasks[i] = asyncio.ensure_future(consumer_callablesi)
    finally:
    # Make sure we clean up tasks on exit
    for task in tasks:
    task.cancel()
    try:
    await task
    except CancelledError:
    pass

    相关文章

      网友评论

          本文标题:Consumers

          本文链接:https://www.haomeiwen.com/subject/bvgqcctx.html