美文网首页
Celery消息队列的配置简例(模拟优先级)

Celery消息队列的配置简例(模拟优先级)

作者: Mr韩_xianfeng | 来源:发表于2024-01-02 16:23 被阅读0次

    本例将使用django+celery+redis演示如何配置celery的任务队列以及如何使用他们
    django==3.0.4
    celery==4.4.0
    redis==3.3.10
    大家都知道redis不像rabbitmq支持任务的优先级,但可以使用不同的队列来区分不同的任务类型,比如优先级较高且耗时较短的任务都放到queue1,而耗时较长的可以放到queue2。这样不同类型的任务就不再互相干扰了,这不是个完美的方案,但可以满足大部分场景了。

    配置

    方法1

    CELERY_DEFAULT_QUEUE = "default"
    CELERY_QUEUES = {
        "default": {
            "exchange": "default",
            "exchange_type": "direct",
            "routing_key": "default"
        },
        "queue0": {
            "routing_key": "P0",
            "exchange": "P0",
            "exchange_type": "P0",
        }
    }
    

    方法2

    from kombu import Queue,Exchange
    CELERY_QUEUES = ( 
      Queue('default', Exchange('default'), routing_key='default'), 
      Queue('P0', Exchange('P0'), routing_key='P0'),
      ) 
    
    

    其中路由键支持正则匹配,比如routing_key="WEB.” 则以WEB开头的路有都归到default队列.
    其它参数自行查阅吧

    调用

    @app.task(name='DEMO_P0', queue='P0')
    def task_p0():
        print('this is p0')
        return 0
    

    注:也可以在调用时指定队列
    r = task1.apply_async(args=(1, 2), queue='queue1', routing_key='queue1')
    亦或使用CELERY_ROUTES配置路由

    启动

    celery -A core worker -E -l info
    celery -A core worker -E -l info -Q P0
    这里第一行是消费默认的default队列,第二行是单独消费P0队列
    注:这里启动时我没有指定-P eventlet,因为测试时发现会提示django.db.utils.DatabaseError: DatabaseWrapper objects created in a thread can only be used in that same thread.
    网上有几个解法如版本问题、关闭线程数据库连接等都不太好使,也没再深入研究。如果哪位知晓也烦请告知,感谢。

    相关文章

      网友评论

          本文标题:Celery消息队列的配置简例(模拟优先级)

          本文链接:https://www.haomeiwen.com/subject/nhgpndtx.html