美文网首页pythonPython
Python Celery 队列

Python Celery 队列

作者: Exqlnet | 来源:发表于2018-09-01 19:17 被阅读140次

前言

最近某个Flask Web项目需要定时读取数据库,并对数据进行更新,想了想还是有自己的实现办法的:
引入threading

from threading import Thread
thr = Thread(target=timed_task)
thr.start()

这样就通过一个函数构造了一个线程,每次在manager.run()之前加上这几行代码,就大概可以实现要求了
但是存在问题,Python有GIL全局锁限制,一个Python进程实际上总是一个线程在跑,无法充分使用CPU(可能还可以使用多进程Process,但博主不太熟悉这方面,没有尝试),自然性能上会有很大问题。
并且还有一个问题是,博主使用uwsgi在服务器上部署Web应用,所以在这里写出这样的代码就太过牵强了,我想应该有一个工具能够不断的监控、分配任务、执行任务,它就是Celery

Celery介绍

Celery翻译为“队列”,它的工作过程也自然离不开这个概念。

Celery里有两个模块:worker和beat

worker:用于执行队列中的任务
beat:用于定时分配任务
这两个模块可以同时启动,也可以分别启动

任务可以有两个来源

代码内调用:将需要放入执行队列的任务函数import进来
beat定时派发:在配置文件里设置好需要调用的任务函数和调用周期,beat就会自动派发任务到队列里了

Celery安装

Celery使用之前需要配置消息中间件

一般是用Redis数据库来通讯更加方便,需要本地安装redis服务并启动,具体操作请参考其他博客和官方文档。
Python也需要安装redis支持

pip install redis

用pip可以非常简单地安装

pip install celery

Celery最简单的Demo

我们先来看看这个代码,它是一个最简单的celery程序
celery_demo.py

import celery
import time

worker = celery.Celery("celery_name", backend="redis://localhost:6379/", broker="redis://localhost:6379/")


@worker.task
def hello():
    return "hello,{}".format(time.time())

这样每次处理hello这个任务的时候,就会返回“hello,”加上一个时间戳
如此而来,我们只是定义好了任务函数和worker(celery对象)

我们还需要创建一个py来调用这个模块(当然你也可以直接在命令行把这个模块import进去)
celery_schedule.py

from celery_demo import hello

hello.delay()

每运行一次celery_schedule.py,一个hello任务就会被放入任务队列,等待worker执行

现在我们已经将它运行了一次,我们需要开启worker来执行它
在命令行运行如下代码来启动worker:

celery worker -A celery_demo.worker -l info
// 后面的-l info参数意思是开启日志模式,所有消息将会打印在命令行

可以发现命令行已经出现结果了

[2018-09-01 19:02:32,218: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-09-01 19:02:32,224: INFO/MainProcess] mingle: searching for neighbors
[2018-09-01 19:02:33,238: INFO/MainProcess] mingle: all alone
[2018-09-01 19:02:33,274: INFO/MainProcess] celery@exqlnet-PC ready.
[2018-09-01 19:02:33,275: INFO/MainProcess] Received task: celery_demo.hello[9f32d5e8-282f-44b1-a6b7-39d21682b5f7]  
[2018-09-01 19:02:33,276: INFO/MainProcess] Received task: celery_demo.hello[bb4342f4-4950-4b9d-b0d1-dd20614b8b29]  
[2018-09-01 19:02:33,276: INFO/MainProcess] Received task: celery_demo.hello[8edd36ba-eadc-428a-9398-06f7910e777f]  
[2018-09-01 19:02:33,285: INFO/ForkPoolWorker-1] Task celery_demo.hello[9f32d5e8-282f-44b1-a6b7-39d21682b5f7] succeeded in 0.00840658400557004s: 'hello,1535799753.2767727'
[2018-09-01 19:02:33,285: INFO/ForkPoolWorker-3] Task celery_demo.hello[8edd36ba-eadc-428a-9398-06f7910e777f] succeeded in 0.007285808002052363s: 'hello,1535799753.278022'
[2018-09-01 19:02:33,290: INFO/ForkPoolWorker-2] Task celery_demo.hello[bb4342f4-4950-4b9d-b0d1-dd20614b8b29] succeeded in 0.013728965997870546s: 'hello,1535799753.2767704'

Celery定时任务

那么既然已经可以简单实现任务分配和执行了,那么如何定时分配任务呢?
我们在celery_demo.py里加一些东西:

import celery
import time
from datetime import timedelta

worker = celery.Celery("celery_name", backend="redis://localhost:6379/", broker="redis://localhost:6379/")


class Config:
    CELERYBEAT_SCHEDULE = {
        'update_info': {
            'task': 'celery_demo.hello',
            "schedule": timedelta(seconds=3),
        }
    }


worker.config_from_object(Config)


@worker.task
def hello():
    return "hello,{}".format(time.time())

类Config是一个配置类,CELERYBEAT_SCHEDULE是用来配置定时任务的,具体的直接看代码
worker.config_from_object()传入一个配置类或对象即可加载配置

一开始就说到Celery的beat才是定时安排任务的工具,所以我们需要用beat来启动定时,在命令行运行以下代码:

celery beat -A celery_demo.worker -l info

启动beat,现在应该可以在命令行看到如下信息:

LocalTime -> 2018-09-01 19:12:53
Configuration ->
    . broker -> redis://localhost:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2018-09-01 19:12:53,821: INFO/MainProcess] beat: Starting...
[2018-09-01 19:12:53,839: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
[2018-09-01 19:12:56,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
[2018-09-01 19:12:59,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
[2018-09-01 19:13:02,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
[2018-09-01 19:13:05,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)

说明任务已经分配了,由于停留时间,我这里已经分配了5个任务到队列里

再次运行启动worker命令

celery worker -A celery_demo.worker -l info

可以在屏幕上看到以下信息了

[tasks]
  . celery_demo.hello

[2018-09-01 19:14:40,146: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-09-01 19:14:40,152: INFO/MainProcess] mingle: searching for neighbors
[2018-09-01 19:14:41,163: INFO/MainProcess] mingle: all alone
[2018-09-01 19:14:41,172: INFO/MainProcess] celery@exqlnet-PC ready.
[2018-09-01 19:14:41,336: INFO/MainProcess] Received task: celery_demo.hello[8edf753c-edd8-4bf0-b708-22dc53fcf07a]  
[2018-09-01 19:14:41,338: INFO/MainProcess] Received task: celery_demo.hello[d747f1a7-12fa-4557-8a98-4db0d4c6f9b3]  
[2018-09-01 19:14:41,340: INFO/MainProcess] Received task: celery_demo.hello[0925b2ba-4c24-428c-958b-5d2072292e8e]  
[2018-09-01 19:14:41,347: INFO/MainProcess] Received task: celery_demo.hello[c63183f5-c191-42c3-99c4-128555100b69]  
[2018-09-01 19:14:41,350: INFO/MainProcess] Received task: celery_demo.hello[8c74ab95-83b7-4d96-a724-044dd4276c30]  
[2018-09-01 19:14:41,352: INFO/MainProcess] Received task: celery_demo.hello[86410550-3817-47c5-8f1b-8c3bf467ae72]  
[2018-09-01 19:14:41,352: INFO/MainProcess] Received task: celery_demo.hello[e00e896f-79cf-40c4-82a7-59b89beebd78]  
[2018-09-01 19:14:41,353: INFO/MainProcess] Received task: celery_demo.hello[7ee3d655-5d20-45fd-85b0-6de38dcf2958]  
[2018-09-01 19:14:41,354: INFO/MainProcess] Received task: celery_demo.hello[e0bcb917-693b-4bea-97fc-0987f337c6bc]  
[2018-09-01 19:14:41,355: INFO/MainProcess] Received task: celery_demo.hello[2f3068bb-d7dc-4f82-8d70-76a03e2fd682]  
[2018-09-01 19:14:41,359: INFO/ForkPoolWorker-2] Task celery_demo.hello[c63183f5-c191-42c3-99c4-128555100b69] succeeded in 0.011140925002109725s: 'hello,1535800481.348562'
[2018-09-01 19:14:41,360: INFO/ForkPoolWorker-4] Task celery_demo.hello[8edf753c-edd8-4bf0-b708-22dc53fcf07a] succeeded in 0.021171232001506723s: 'hello,1535800481.3396409'
[2018-09-01 19:14:41,361: INFO/ForkPoolWorker-2] Task celery_demo.hello[8c74ab95-83b7-4d96-a724-044dd4276c30] succeeded in 0.0003823369988822378s: 'hello,1535800481.3609214'
[2018-09-01 19:14:41,362: INFO/ForkPoolWorker-3] Task celery_demo.hello[d747f1a7-12fa-4557-8a98-4db0d4c6f9b3] succeeded in 0.02279239599738503s: 'hello,1535800481.33969'
[2018-09-01 19:14:41,362: INFO/ForkPoolWorker-4] Task celery_demo.hello[86410550-3817-47c5-8f1b-8c3bf467ae72] succeeded in 0.0008120330021483824s: 'hello,1535800481.3620644'
[2018-09-01 19:14:41,363: INFO/ForkPoolWorker-2] Task celery_demo.hello[e00e896f-79cf-40c4-82a7-59b89beebd78] succeeded in 0.000599899998633191s: 'hello,1535800481.362508'
[2018-09-01 19:14:41,364: INFO/ForkPoolWorker-4] Task celery_demo.hello[7ee3d655-5d20-45fd-85b0-6de38dcf2958] succeeded in 0.0004470089988899417s: 'hello,1535800481.3638816'
[2018-09-01 19:14:41,364: INFO/ForkPoolWorker-3] Task celery_demo.hello[e0bcb917-693b-4bea-97fc-0987f337c6bc] succeeded in 0.000407947001804132s: 'hello,1535800481.363944'
[2018-09-01 19:14:41,364: INFO/ForkPoolWorker-2] Task celery_demo.hello[2f3068bb-d7dc-4f82-8d70-76a03e2fd682] succeeded in 0.00035140299587510526s: 'hello,1535800481.3644059'
[2018-09-01 19:14:41,365: INFO/ForkPoolWorker-1] Task celery_demo.hello[0925b2ba-4c24-428c-958b-5d2072292e8e] succeeded in 0.016904400996281765s: 'hello,1535800481.3485875'

相关文章

  • 实现简单的python3+flask+celery+redis框

    详解python3+flask+celery+redis Celery是什么? Celery是个异步分布式任务队列...

  • celery--turorial

    Celery Celery (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/...

  • celery redis rabbitMQ各是什么及之间的区别?

    Celery: Celery是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线...

  • Celery初体验

    Celery与任务队列 Celery是Python中流行的分布式任务队列。所谓分布式任务队列,是一种将任务分发到不...

  • celery任务状态监控

    celery是python里常用的一个异步任务队列,使用celery可以大大简化搭建任务队列的工作。实际使用中可能...

  • django+celery异步任务

    什么是celery Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器...

  • Python Celery 队列

    前言 最近某个Flask Web项目需要定时读取数据库,并对数据进行更新,想了想还是有自己的实现办法的:引入thr...

  • python爬虫之celery分布式任务(踩坑)

    一. celery和RabbitMQ简单介绍 Celery是一个基于Python开发的分布式异步消息队列,可以轻松...

  • 爬虫架构|Celery+RabbitMQ快速入门(一)

    一、Celery和RabbitMQ简单介绍 Celery是一个基于Python开发的分布式异步消息队列,可以轻松实...

  • 初探Celery

    初探Celery 参考自董伟明的Python Web开发实战 了解Celery前,可以先了解下任务与消息队列 Ce...

网友评论

    本文标题:Python Celery 队列

    本文链接:https://www.haomeiwen.com/subject/nemuwftx.html