ChannelLayerManager
在获取DEFAULT没有时,引入设置中的backend_class,调用backend_class(**config),之后返回
BaseChannelLayer
def __init__(self, expiry=60, capacity=100, channel_capacity=None):
self.expiry = expiry
self.capacity = capacity
self.channel_capacity = channel_capacity or {}
InMemoryChannelLayer
def __init__(
self,
expiry=60,
group_expiry=86400,
capacity=100,
channel_capacity=None,
**kwargs
):
super().__init__(
expiry=expiry,
capacity=capacity,
channel_capacity=channel_capacity,
**kwargs
)
self.channels = {}
self.groups = {}
self.group_expiry = group_expiry
加入组
async def group_add(self, group, channel):
self.groups.setdefault(group, {})
self.groups[group][channel] = time.time()
退出组
async def group_discard(self, group, channel):
if group in self.groups:
if channel in self.groups[group]:
del self.groups[group][channel]
if not self.groups[group]:
del self.groups[group]
发送消息
async def group_send(self, group, message):
self._clean_expired()
# Send to each channel
for channel in self.groups.get(group, set()):
try:
await self.send(channel, message)
except ChannelFull:
pass
在每个consumer的实现中,已经监听了附带了channel_name的receive,这里发送之后,再看reve
网友评论