Django 框架需要一个消息队列来支持分布式任务处理,大部分人会选择 celery。但 celery 不支持 windows。
之前,用过funboost,功能强大,但是其代码充斥着拼写错误,让人难以忍受。
今天看了 Dramatiq, 发现完全满足项目需求。于是上手尝试了一下。
其主要特性有:
- 如果用了普罗米修斯中间件,则可以通过
http://127.0.0.1:9191/
,查看workers状态。(在windows上试了,响应不稳定) - 代码变动后,可向主进程发送 SIGHUP 信号,从而让 workers 更新状态。(但windows下,没有有向进程发送信号的便捷手段)
$ kill -s HUP 13047
- 可以加入
--watch
参数,实现 hot-reload, 但生产环境不推荐。 -
max_retries
:设定最大重试次数。 -
retry_when
:设定重试条件。 -
max_age
:任务寿命毫秒。
当消息超过最大重试次数或者在队列里时长超过寿命,则会进入死信区,在这里最多保留 7 天,然后被丢弃。
-
time_limit
:执行时长上限(毫秒),默认为 10 分钟。超时,则会报出TimeLimitExceeded
异常。 -
priority
: 任务优先级, 默认为 0, 越小优先级越高。 -
send_with_options()
: 提供任务调度能力 -
方便与Django框架结合,在admin中能够看到任务执行历史。
Actors
Actor 的并行
你的 actor 会跟其它 actors 并发执行,注意操作相同资源时,可能出现的竞争关系。
简单消息参数
给 actor 传递的消息应该是能被 json 序列化的数据,例如 bool, int, float, bytes, string, list and dict.
幂等性
受网络、硬件、电源等因素影响,一个 actor 可能会反复接收到相同消息,所以你要保证多次执行 actor 的安全性。
Messages 消息
brokers 接收到消息后,会持久化到硬盘。broker 重启后,会重新激活消息。
消息处理结果,可以通过 Results 中间件,返回 Redis 或者 Memcached 中。
优雅的处理中断
import dramatiq
from dramatiq.middleware import Interrupt
@dramatiq.actor(max_retries=3, notify_shutdown=True)
def long_running_task():
try:
setup()
do_work()
except Shutdown:
cleanup()
raise
定时任务
可以跟apscheduler
结合,实现定时执行任务的功能。
网友评论