Celery 介绍
-
celery是处理大量消息的分布式系统
-
专注于实时处理的异步任务队列
-
同时支持任务调度
在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思。在工头(生产者)提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农(消费者)等着取出一个个任务准备着手做。这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。
使用场景
-
异步任务(async task): 将耗时任务交由celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等
-
定时任务(crontab):由 Celery Beat 进程周期性地将任务发往任务队列,比如每日数据统计等
其实现架构如下图所示:
celery原理.png可以看到,Celery 主要包含以下几个模块:
-
任务模块 Task
包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
-
消息中间件 Broker
Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
-
任务执行单元 Worker
Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。
-
任务结果存储 Backend
Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, redis 和 MongoDB 等。其中,Backend可通过Celery配置中的一个配置项 CELERY_RESULT_BACKEND进行设置,可以是Database backend,也可以是Cache backend。
Celery 相关安装
celery可以通过pip自动安装。
pip install celery
broker 可选择使用RabbitMQ/redis,backend可选择使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安装请参考对应的官方文档。
------------------------------rabbitmq相关----------------------------------------------------------
官网安装方法:http://www.rabbitmq.com/install-windows.html
启动管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 启动rabbitmq:sbin/rabbitmq-server -detached
rabbitmq已经启动,可以打开页面来看看 地址:http://localhost:15672/#/
用户名密码都是guest 。进入可以看到具体页面。 关于rabbitmq的配置,网上很多 自己去搜以下就ok了。
------------------------------rabbitmq相关--------------------------------------------------------
开始使用celery
项目结构如下:
项目结构.png使用前,需要三个方面:celery配置,celery实例,需执行的任务函数,如下:
---------------celery_config.py------------------------
celery配置文件
-------------------------------------------------------
from configs import DEFAULT_CELERY_BROKER_URL
from celery import Celery, platforms
from celery.schedules import crontab
SCHEDULE_QUEUE = 'saa.2.schedule.queue'
DEFAULT_QUEUE = 'saa.2.default.queue'
class BaseCeleryConfig(object):
"""Base configuration for celery.
Each celery instance configs class will extend from this class
"""
# 设置任务序列化方法
CELERY_TASK_SERIALIZER = 'json'
# 设置结果序列化方法
CELERY_RESULT_SERIALIZER = 'json'
# worker并发数 默认为CPU核数
CELERYD_CONCURRENCY = 2
# 开启延迟确认(默认为false)
# PS: 开启延迟确认使worker只有在任务完成(成功/失败)的情况下,才向broker发送确认信息。在任务信息不能丢失的场景中,这个功能是及其有用的
CELERY_ACKS_LATE = True
# 忽略任务结果(成功or失败的信息)
CELERY_IGNORE_RESULT = True
# 为True时,即使CELERY_IGNORE_RESULT=True,也会存储错误。
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
# 任务预取功能:即每个worker在broker获取task时会尽量拿n个,以压缩获取任务的通讯成本,1表示关闭此功能;0表示尽可能多拿;
CELERYD_PREFETCH_MULTIPLIER = 1
# 监控客户端事件队列被删除前的过期时间
CELERY_EVENT_QUEUE_EXPIRES = 7200
# 时区,默认为UTC
CELERY_TIMEZONE = 'UTC'
class DefaultCeleryConfig(BaseCeleryConfig):
"""Default celery configuration for this project"""
# 指定需要导入的数据模块
CELERY_IMPORTS = (
'logic.celery_task'
)
# 路由器列表:将任务路由到相应的队列
CELERY_ROUTES = {
'enter_company_lead': {
'queue': DEFAULT_QUEUE,
'routing_key': DEFAULT_QUEUE
},
'company_version_daily_statistics': {
'queue': SCHEDULE_QUEUE,
'routing_key': SCHEDULE_QUEUE
},
'daily_statistics': {
'queue': SCHEDULE_QUEUE,
'routing_key': SCHEDULE_QUEUE
}
}
# 任务队列
CELERY_QUEUES = {
SCHEDULE_QUEUE: {
'exchange': SCHEDULE_QUEUE,
'exchange_type': 'direct',
'routing_key': SCHEDULE_QUEUE
},
DEFAULT_QUEUE: {
'exchange': DEFAULT_QUEUE,
'exchange_type': 'direct',
'routing_key': DEFAULT_QUEUE
}
}
class ScheduleCeleryConfig(BaseCeleryConfig):
"""
Schedule celery configuration for this project.
All schedule tasks run in default celery queue.
"""
# 使用本地时间
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = 'Asia/Shanghai'
# 定时任务调度配置:
# task:需要执行的任务名称;
# schedule:任务执行时间(timedelta对象或crontab对象);
# options: 额外选项,均是apply_async()可用的参数
CELERYBEAT_SCHEDULE = {
'company_version_daily_statistics': {
'task': 'company_version_daily_statistics',
'schedule': crontab(minute=0, hour=23), # 每天23点统计
'options': {
'queue': SCHEDULE_QUEUE,
'routing_key': SCHEDULE_QUEUE,
'exchange': SCHEDULE_QUEUE,
'exchange_type': 'direct'
}
},
'daily_statistics': {
'task': 'daily_statistics',
'schedule': crontab(minute=0, hour=1), # 每天1点统计
'options': {
'queue': SCHEDULE_QUEUE,
'routing_key': SCHEDULE_QUEUE,
'exchange': SCHEDULE_QUEUE,
'exchange_type': 'direct'
}
}
}
def create_celery_instance(name, config, broker=DEFAULT_CELERY_BROKER_URL):
"""
创建Celery实例
Args:
name: celery名
config: celery的配置
broker: celery的broker
Returns:
celery_instance: celery实例
"""
inst = Celery(name, broker=broker)
inst.config_from_object(config)
platforms.C_FORCE_ROOT = True # running celery worker by rooter
return inst
Celery 的配置比较多,可以在 官方配置文档:http://docs.celeryproject.org/en/latest/userguide/configuration.html 查询每个配置项的含义。
---------------default_celery.py------------------------
默认celery实例,并加载celery配置项
--------------------------------------------------------
from configs.celery_config import DefaultCeleryConfig, create_celery_instance
default_inst = create_celery_instance(
name='saas_2_default_celery',
config=DefaultCeleryConfig)
---------------schedule_celery.py------------------------
任务调度celery实例
--------------------------------------------------------
from configs.celery_config import ScheduleCeleryConfig, create_celery_instance
schedule_inst = create_celery_instance(
name='saas_2_schedule_celery',
config=ScheduleCeleryConfig)
---------------celery_task.py------------------------
任务函数
-----------------------------------------------------
@default_inst.task(name='company_version_daily_statistics')
@celery_logging_decorator
def company_version_daily_statistics():
"""
企业账号版本统计
:return:
"""
date = datetime.now().strftime('%Y-%m-%d')
logic_company_version_daily_statistics(date)
logic_update_company_platform_statistics(date)
@default_inst.task(name='daily_statistics')
@celery_logging_decorator
def daily_statistics():
""" 高级筛选每日统计"""
# 第二天统计前一天的
yesterday = datetime.now() - timedelta(days=1)
dt_str = yesterday.strftime('%Y-%m-%d')
# 转线索数、营销触达数统计
logic_daily_company_statistics(dt_str)
# 计算洞客指数
logic_cal_doncus_index(dt_str)
@default_inst.task(name='enter_company_lead')
@celery_logging_decorator
def enter_task(company_id, update_frequency):
"""
进入规则
:param company_id:
:param update_frequency: 更新频率
* `1` - 每天
* `2` - 每周日
* `3` - 每月一号
:return:
"""
enter_rule_task(company_id, update_frequency)
异步任务调用方法:
# celery异步任务触发方法一:只支持传递任务函数的参数
enter_task.delay('sdfwerfde2323434', 1)
# celery异步任务触发方法二:支持任务函数参数,也支持任务的执行选项
enter_task.apply_async(args=['sdfwerfde2323434', 1], kwargs={})
-----------------------------------------------------------------------------------------------
# 其他参数
task_id:为任务分配唯一id,默认是uuid;
countdown : 设置该任务等待一段时间再执行,单位为s;
eta : 定义任务的开始时间;eta=time.time()+10;
expires : 设置任务时间,任务在过期时间后还没有执行则被丢弃;
retry : 如果任务失败后, 是否重试;使用true或false,默认为true
shadow:重新指定任务的名字str,覆盖其在日志中使用的任务名称;
retry_policy : {},重试策略.如下:
max_retries : 最大重试次数, 默认为 3 次.
interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待.
interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2
interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 .
routing_key:自定义路由键;
queue:指定发送到哪个队列;
exchange:指定发送到哪个交换机;
priority:任务队列的优先级,0到255之间,对于rabbitmq来说0是最高优先级;
serializer:任务序列化方法;通常不设置;
compression:压缩方案,通常有zlib, bzip2
headers:为任务添加额外的消息;
link:任务成功执行后的回调方法;是一个signature对象;可以用作关联任务;
link_error: 任务失败后的回调方法,是一个signature对象;
# 如下
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
-----------------------------------------------------------------------------------------------
# celery异步任务触发方法三: 可以发送未被注册的异步任务,即没有被default_inst.task装饰的任务
default_inst.send_task("enter_company_lead", ['sdfwerfde2323434', 1])
当然,要保证上述异步任务and下述定时任务都能正常执行,就需要先启动celery worker,启动命令行如下:
celery -A configs.default_celery worker -P gevent -l info
定时任务执行方法
需启动beat,执行定时任务时, Celery会通过celery beat进程来完成。Celery beat会保持运行, 一旦到了某一定时任务需要执行时, Celery beat便将其加入到queue中. 不像worker进程, Celery beat只需要一个即可。而且为了避免有重复的任务被发送出去,所以Celery beat仅能有一个。
命令行启动:
celery -A configs.schedule_celery beat -l info
supervisor&celery
如果你想将celery worker/beat要放到后台运行,推荐可以扔给supervisor。
supervisor.conf如下:
; supervisor config file
[unix_http_server]
file=/var/run/supervisor.sock ; (the path to the socket file)
chmod=0700 ; sockef file mode (default 0700)
[supervisord]
logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log)
pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
childlogdir=/var/log/supervisor ; ('AUTO' child log dir, default $TEMP)
; the below section must remain in the config file for RPC
; (supervisorctl/web interface) to work, additional interfaces may be
; added by defining them in separate rpcinterface: sections
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL for a unix socket
; The [include] section can just contain the "files" setting. This
; setting can list multiple files (separated by whitespace or
; newlines). It can also contain wildcards. The filenames are
; interpreted as relative to this file. Included files *cannot*
; include files themselves.
[include]
files = /etc/supervisor/conf.d/*.conf
---------------------celery.conf---------------------------
celery 相关supervisor配置
---------------------celery.conf---------------------------
[program:project_name-celery]
command=/root/project_name/bin/celery -A configs.default_celery worker -P gevent -l info
autostart=true
directory=/root/project_name/project_name
autorestart=true
startsecs=10
startretries=1
stopwaitsecs=600
redirect_stderr=true
stdout_logfile_maxbytes=50MB
stdout_logfile=/var/log/supervisor/project_name/celery-access.log
stderr_logfile=/var/log/supervisor/project_name/celery-error.log
user=root
[program:project_name-beat-celery]
command=/root/project_name/bin/celery -A configs.schedule_celery beat -l info
autostart=true
directory=/root/project_name/project_name
autorestart=true
startsecs=10
startretries=1
stopwaitsecs=600
redirect_stderr=true
stdout_logfile_maxbytes=50MB
stdout_logfile=/var/log/supervisor/project_name/beat-celery-access.log
stderr_logfile=/var/log/supervisor/project_name/beat-celery-error.log
user=root
网友评论