美文网首页pythonPython
Python Celery 队列

Python Celery 队列

作者: Exqlnet | 来源:发表于2018-09-01 19:17 被阅读140次

    前言

    最近某个Flask Web项目需要定时读取数据库,并对数据进行更新,想了想还是有自己的实现办法的:
    引入threading

    from threading import Thread
    thr = Thread(target=timed_task)
    thr.start()
    

    这样就通过一个函数构造了一个线程,每次在manager.run()之前加上这几行代码,就大概可以实现要求了
    但是存在问题,Python有GIL全局锁限制,一个Python进程实际上总是一个线程在跑,无法充分使用CPU(可能还可以使用多进程Process,但博主不太熟悉这方面,没有尝试),自然性能上会有很大问题。
    并且还有一个问题是,博主使用uwsgi在服务器上部署Web应用,所以在这里写出这样的代码就太过牵强了,我想应该有一个工具能够不断的监控、分配任务、执行任务,它就是Celery

    Celery介绍

    Celery翻译为“队列”,它的工作过程也自然离不开这个概念。

    Celery里有两个模块:worker和beat

    worker:用于执行队列中的任务
    beat:用于定时分配任务
    这两个模块可以同时启动,也可以分别启动

    任务可以有两个来源

    代码内调用:将需要放入执行队列的任务函数import进来
    beat定时派发:在配置文件里设置好需要调用的任务函数和调用周期,beat就会自动派发任务到队列里了

    Celery安装

    Celery使用之前需要配置消息中间件

    一般是用Redis数据库来通讯更加方便,需要本地安装redis服务并启动,具体操作请参考其他博客和官方文档。
    Python也需要安装redis支持

    pip install redis
    

    用pip可以非常简单地安装

    pip install celery
    

    Celery最简单的Demo

    我们先来看看这个代码,它是一个最简单的celery程序
    celery_demo.py

    import celery
    import time
    
    worker = celery.Celery("celery_name", backend="redis://localhost:6379/", broker="redis://localhost:6379/")
    
    
    @worker.task
    def hello():
        return "hello,{}".format(time.time())
    

    这样每次处理hello这个任务的时候,就会返回“hello,”加上一个时间戳
    如此而来,我们只是定义好了任务函数和worker(celery对象)

    我们还需要创建一个py来调用这个模块(当然你也可以直接在命令行把这个模块import进去)
    celery_schedule.py

    from celery_demo import hello
    
    hello.delay()
    

    每运行一次celery_schedule.py,一个hello任务就会被放入任务队列,等待worker执行

    现在我们已经将它运行了一次,我们需要开启worker来执行它
    在命令行运行如下代码来启动worker:

    celery worker -A celery_demo.worker -l info
    // 后面的-l info参数意思是开启日志模式,所有消息将会打印在命令行
    

    可以发现命令行已经出现结果了

    [2018-09-01 19:02:32,218: INFO/MainProcess] Connected to redis://localhost:6379//
    [2018-09-01 19:02:32,224: INFO/MainProcess] mingle: searching for neighbors
    [2018-09-01 19:02:33,238: INFO/MainProcess] mingle: all alone
    [2018-09-01 19:02:33,274: INFO/MainProcess] celery@exqlnet-PC ready.
    [2018-09-01 19:02:33,275: INFO/MainProcess] Received task: celery_demo.hello[9f32d5e8-282f-44b1-a6b7-39d21682b5f7]  
    [2018-09-01 19:02:33,276: INFO/MainProcess] Received task: celery_demo.hello[bb4342f4-4950-4b9d-b0d1-dd20614b8b29]  
    [2018-09-01 19:02:33,276: INFO/MainProcess] Received task: celery_demo.hello[8edd36ba-eadc-428a-9398-06f7910e777f]  
    [2018-09-01 19:02:33,285: INFO/ForkPoolWorker-1] Task celery_demo.hello[9f32d5e8-282f-44b1-a6b7-39d21682b5f7] succeeded in 0.00840658400557004s: 'hello,1535799753.2767727'
    [2018-09-01 19:02:33,285: INFO/ForkPoolWorker-3] Task celery_demo.hello[8edd36ba-eadc-428a-9398-06f7910e777f] succeeded in 0.007285808002052363s: 'hello,1535799753.278022'
    [2018-09-01 19:02:33,290: INFO/ForkPoolWorker-2] Task celery_demo.hello[bb4342f4-4950-4b9d-b0d1-dd20614b8b29] succeeded in 0.013728965997870546s: 'hello,1535799753.2767704'
    

    Celery定时任务

    那么既然已经可以简单实现任务分配和执行了,那么如何定时分配任务呢?
    我们在celery_demo.py里加一些东西:

    import celery
    import time
    from datetime import timedelta
    
    worker = celery.Celery("celery_name", backend="redis://localhost:6379/", broker="redis://localhost:6379/")
    
    
    class Config:
        CELERYBEAT_SCHEDULE = {
            'update_info': {
                'task': 'celery_demo.hello',
                "schedule": timedelta(seconds=3),
            }
        }
    
    
    worker.config_from_object(Config)
    
    
    @worker.task
    def hello():
        return "hello,{}".format(time.time())
    

    类Config是一个配置类,CELERYBEAT_SCHEDULE是用来配置定时任务的,具体的直接看代码
    worker.config_from_object()传入一个配置类或对象即可加载配置

    一开始就说到Celery的beat才是定时安排任务的工具,所以我们需要用beat来启动定时,在命令行运行以下代码:

    celery beat -A celery_demo.worker -l info
    

    启动beat,现在应该可以在命令行看到如下信息:

    LocalTime -> 2018-09-01 19:12:53
    Configuration ->
        . broker -> redis://localhost:6379//
        . loader -> celery.loaders.app.AppLoader
        . scheduler -> celery.beat.PersistentScheduler
        . db -> celerybeat-schedule
        . logfile -> [stderr]@%INFO
        . maxinterval -> 5.00 minutes (300s)
    [2018-09-01 19:12:53,821: INFO/MainProcess] beat: Starting...
    [2018-09-01 19:12:53,839: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
    [2018-09-01 19:12:56,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
    [2018-09-01 19:12:59,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
    [2018-09-01 19:13:02,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
    [2018-09-01 19:13:05,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
    

    说明任务已经分配了,由于停留时间,我这里已经分配了5个任务到队列里

    再次运行启动worker命令

    celery worker -A celery_demo.worker -l info
    

    可以在屏幕上看到以下信息了

    [tasks]
      . celery_demo.hello
    
    [2018-09-01 19:14:40,146: INFO/MainProcess] Connected to redis://localhost:6379//
    [2018-09-01 19:14:40,152: INFO/MainProcess] mingle: searching for neighbors
    [2018-09-01 19:14:41,163: INFO/MainProcess] mingle: all alone
    [2018-09-01 19:14:41,172: INFO/MainProcess] celery@exqlnet-PC ready.
    [2018-09-01 19:14:41,336: INFO/MainProcess] Received task: celery_demo.hello[8edf753c-edd8-4bf0-b708-22dc53fcf07a]  
    [2018-09-01 19:14:41,338: INFO/MainProcess] Received task: celery_demo.hello[d747f1a7-12fa-4557-8a98-4db0d4c6f9b3]  
    [2018-09-01 19:14:41,340: INFO/MainProcess] Received task: celery_demo.hello[0925b2ba-4c24-428c-958b-5d2072292e8e]  
    [2018-09-01 19:14:41,347: INFO/MainProcess] Received task: celery_demo.hello[c63183f5-c191-42c3-99c4-128555100b69]  
    [2018-09-01 19:14:41,350: INFO/MainProcess] Received task: celery_demo.hello[8c74ab95-83b7-4d96-a724-044dd4276c30]  
    [2018-09-01 19:14:41,352: INFO/MainProcess] Received task: celery_demo.hello[86410550-3817-47c5-8f1b-8c3bf467ae72]  
    [2018-09-01 19:14:41,352: INFO/MainProcess] Received task: celery_demo.hello[e00e896f-79cf-40c4-82a7-59b89beebd78]  
    [2018-09-01 19:14:41,353: INFO/MainProcess] Received task: celery_demo.hello[7ee3d655-5d20-45fd-85b0-6de38dcf2958]  
    [2018-09-01 19:14:41,354: INFO/MainProcess] Received task: celery_demo.hello[e0bcb917-693b-4bea-97fc-0987f337c6bc]  
    [2018-09-01 19:14:41,355: INFO/MainProcess] Received task: celery_demo.hello[2f3068bb-d7dc-4f82-8d70-76a03e2fd682]  
    [2018-09-01 19:14:41,359: INFO/ForkPoolWorker-2] Task celery_demo.hello[c63183f5-c191-42c3-99c4-128555100b69] succeeded in 0.011140925002109725s: 'hello,1535800481.348562'
    [2018-09-01 19:14:41,360: INFO/ForkPoolWorker-4] Task celery_demo.hello[8edf753c-edd8-4bf0-b708-22dc53fcf07a] succeeded in 0.021171232001506723s: 'hello,1535800481.3396409'
    [2018-09-01 19:14:41,361: INFO/ForkPoolWorker-2] Task celery_demo.hello[8c74ab95-83b7-4d96-a724-044dd4276c30] succeeded in 0.0003823369988822378s: 'hello,1535800481.3609214'
    [2018-09-01 19:14:41,362: INFO/ForkPoolWorker-3] Task celery_demo.hello[d747f1a7-12fa-4557-8a98-4db0d4c6f9b3] succeeded in 0.02279239599738503s: 'hello,1535800481.33969'
    [2018-09-01 19:14:41,362: INFO/ForkPoolWorker-4] Task celery_demo.hello[86410550-3817-47c5-8f1b-8c3bf467ae72] succeeded in 0.0008120330021483824s: 'hello,1535800481.3620644'
    [2018-09-01 19:14:41,363: INFO/ForkPoolWorker-2] Task celery_demo.hello[e00e896f-79cf-40c4-82a7-59b89beebd78] succeeded in 0.000599899998633191s: 'hello,1535800481.362508'
    [2018-09-01 19:14:41,364: INFO/ForkPoolWorker-4] Task celery_demo.hello[7ee3d655-5d20-45fd-85b0-6de38dcf2958] succeeded in 0.0004470089988899417s: 'hello,1535800481.3638816'
    [2018-09-01 19:14:41,364: INFO/ForkPoolWorker-3] Task celery_demo.hello[e0bcb917-693b-4bea-97fc-0987f337c6bc] succeeded in 0.000407947001804132s: 'hello,1535800481.363944'
    [2018-09-01 19:14:41,364: INFO/ForkPoolWorker-2] Task celery_demo.hello[2f3068bb-d7dc-4f82-8d70-76a03e2fd682] succeeded in 0.00035140299587510526s: 'hello,1535800481.3644059'
    [2018-09-01 19:14:41,365: INFO/ForkPoolWorker-1] Task celery_demo.hello[0925b2ba-4c24-428c-958b-5d2072292e8e] succeeded in 0.016904400996281765s: 'hello,1535800481.3485875'
    

    相关文章

      网友评论

        本文标题:Python Celery 队列

        本文链接:https://www.haomeiwen.com/subject/nemuwftx.html