介绍
由于对 celery 在消息队列上实现远程控制的好奇,因而以 revoke
命令(取消任务)为例来研究其中的实现。
源码分析
- worker 端对
revoke
的实现
image.png
- 调用流: 任务的
revoke
-->
request
的terminate
-->
进程/线程pool
的terminate
-->
最终本质还是调用os.kill(pid, signal)
- worker 端
revoke
在消息队列上的实现
image.png
- 调用流:
revoke
注册到Panel
类的类变量data
-->
Panel
的data
赋值给Mailbox.Node
的handlers
,worker
通过on_message
来处理消息队列中的消息,实际上调用的Mailbox.Node
(TODO:
有关kombu/pibox
的实现) 的handle_message
。 - 代码组织理解:注册函数的方式使得添加和减少控制器的函数不具有侵入性(不会改动源代码),并将控制器封装赋值给handlers。
- 注册函数的实现:函数装饰器结合返回类方法实现注册函数的方式值得参考,使用类变量和类方法来实现全局注册,在需要注册隔离时可以采用实例方法,这里通过
type
字段区分不同注册功能函数。
- client 端
revoke
在消息队列上的实现
image.png
- 调用流:
client
调用control
的revoke
方法-->
调用control
的broadcast
方法-->
调用Mailbox
的_broadcast
方法-->
调用Mailbox
的_publish
方法。
网友评论