layers

作者: xncode | 来源:发表于2019-06-26 16:34 被阅读0次

    ChannelLayerManager

    在获取DEFAULT没有时,引入设置中的backend_class,调用backend_class(**config),之后返回

    BaseChannelLayer

    def __init__(self, expiry=60, capacity=100, channel_capacity=None):
        self.expiry = expiry
        self.capacity = capacity
        self.channel_capacity = channel_capacity or {}
    

    InMemoryChannelLayer

    def __init__(
        self,
        expiry=60,
        group_expiry=86400,
        capacity=100,
        channel_capacity=None,
        **kwargs
    ):
        super().__init__(
            expiry=expiry,
            capacity=capacity,
            channel_capacity=channel_capacity,
            **kwargs
        )
        self.channels = {}
        self.groups = {}
        self.group_expiry = group_expiry
    

    加入组

    async def group_add(self, group, channel):
        self.groups.setdefault(group, {})
        self.groups[group][channel] = time.time()
    

    退出组

    async def group_discard(self, group, channel):
        if group in self.groups:
            if channel in self.groups[group]:
                del self.groups[group][channel]
            if not self.groups[group]:
                del self.groups[group]
    

    发送消息

    async def group_send(self, group, message):
        self._clean_expired()
        # Send to each channel
        for channel in self.groups.get(group, set()):
            try:
                await self.send(channel, message)
            except ChannelFull:
                pass
    

    在每个consumer的实现中,已经监听了附带了channel_name的receive,这里发送之后,再看reve

    相关文章

      网友评论

          本文标题:layers

          本文链接:https://www.haomeiwen.com/subject/aoeccctx.html