美文网首页
Python 异步任务队列Celery 使用

Python 异步任务队列Celery 使用

作者: handsomePeng | 来源:发表于2019-09-27 16:08 被阅读0次

    Celery 介绍

    • celery是处理大量消息的分布式系统

    • 专注于实时处理的异步任务队列

    • 同时支持任务调度

    celery原理.png

    在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思。在工头(生产者)提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农(消费者)等着取出一个个任务准备着手做。这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。

    使用场景

    • 异步任务(async task): 将耗时任务交由celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等

    • 定时任务(crontab):由 Celery Beat 进程周期性地将任务发往任务队列,比如每日数据统计等

    其实现架构如下图所示:

    celery原理.png

    可以看到,Celery 主要包含以下几个模块:

    • 任务模块 Task

      包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列

    • 消息中间件 Broker

      Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

    • 任务执行单元 Worker

      Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它

    • 任务结果存储 Backend

      Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, redisMongoDB 等。其中,Backend可通过Celery配置中的一个配置项 CELERY_RESULT_BACKEND进行设置,可以是Database backend,也可以是Cache backend。

    Celery 相关安装

    celery可以通过pip自动安装。

    pip install celery
    

    broker 可选择使用RabbitMQ/redis,backend可选择使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安装请参考对应的官方文档。

    ------------------------------rabbitmq相关----------------------------------------------------------

    官网安装方法:http://www.rabbitmq.com/install-windows.html

    启动管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 启动rabbitmq:sbin/rabbitmq-server -detached

    rabbitmq已经启动,可以打开页面来看看 地址:http://localhost:15672/#/

    用户名密码都是guest 。进入可以看到具体页面。 关于rabbitmq的配置,网上很多 自己去搜以下就ok了。

    ------------------------------rabbitmq相关--------------------------------------------------------

    开始使用celery

    项目结构如下:

    项目结构.png

    使用前,需要三个方面:celery配置,celery实例,需执行的任务函数,如下:

    ---------------celery_config.py------------------------
    celery配置文件
    -------------------------------------------------------
    ​
    from configs import DEFAULT_CELERY_BROKER_URL
    from celery import Celery, platforms
    from celery.schedules import crontab
    ​
    ​
    SCHEDULE_QUEUE = 'saa.2.schedule.queue'
    DEFAULT_QUEUE = 'saa.2.default.queue'
    ​
    class BaseCeleryConfig(object):
     """Base configuration for celery.
     Each celery instance configs class will extend from this class
     """
     # 设置任务序列化方法
     CELERY_TASK_SERIALIZER = 'json'
     # 设置结果序列化方法
     CELERY_RESULT_SERIALIZER = 'json'
     # worker并发数 默认为CPU核数
     CELERYD_CONCURRENCY = 2
     # 开启延迟确认(默认为false)
     # PS: 开启延迟确认使worker只有在任务完成(成功/失败)的情况下,才向broker发送确认信息。在任务信息不能丢失的场景中,这个功能是及其有用的
     CELERY_ACKS_LATE = True
     # 忽略任务结果(成功or失败的信息)
     CELERY_IGNORE_RESULT = True
     # 为True时,即使CELERY_IGNORE_RESULT=True,也会存储错误。
     CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
     # 任务预取功能:即每个worker在broker获取task时会尽量拿n个,以压缩获取任务的通讯成本,1表示关闭此功能;0表示尽可能多拿;
     CELERYD_PREFETCH_MULTIPLIER = 1
     # 监控客户端事件队列被删除前的过期时间
     CELERY_EVENT_QUEUE_EXPIRES = 7200
     # 时区,默认为UTC
     CELERY_TIMEZONE = 'UTC'
    
    class DefaultCeleryConfig(BaseCeleryConfig):
     """Default celery configuration for this project"""
     # 指定需要导入的数据模块
     CELERY_IMPORTS = (
     'logic.celery_task'
     )
    ​
     # 路由器列表:将任务路由到相应的队列
     CELERY_ROUTES = {
     'enter_company_lead': {
     'queue': DEFAULT_QUEUE,
     'routing_key': DEFAULT_QUEUE
     },
     'company_version_daily_statistics': {
     'queue': SCHEDULE_QUEUE,
     'routing_key': SCHEDULE_QUEUE
     },
     'daily_statistics': {
     'queue': SCHEDULE_QUEUE,
     'routing_key': SCHEDULE_QUEUE
     }
     }
    ​
     # 任务队列
     CELERY_QUEUES = {
     SCHEDULE_QUEUE: {
     'exchange': SCHEDULE_QUEUE,
     'exchange_type': 'direct',
     'routing_key': SCHEDULE_QUEUE
     },
     DEFAULT_QUEUE: {
     'exchange': DEFAULT_QUEUE,
     'exchange_type': 'direct',
     'routing_key': DEFAULT_QUEUE
     }
     }
    
    ​
    class ScheduleCeleryConfig(BaseCeleryConfig):
     """
     Schedule celery configuration for this project.
     All schedule tasks run in default celery queue.
     """
     # 使用本地时间
     CELERY_ENABLE_UTC = False
     CELERY_TIMEZONE = 'Asia/Shanghai'
     # 定时任务调度配置:
     # task:需要执行的任务名称; 
     # schedule:任务执行时间(timedelta对象或crontab对象); 
     # options: 额外选项,均是apply_async()可用的参数
     CELERYBEAT_SCHEDULE = {
     'company_version_daily_statistics': {
     'task': 'company_version_daily_statistics',
     'schedule': crontab(minute=0, hour=23),  # 每天23点统计
     'options': {
     'queue': SCHEDULE_QUEUE,
     'routing_key': SCHEDULE_QUEUE,
     'exchange': SCHEDULE_QUEUE,
     'exchange_type': 'direct'
     }
     },
     'daily_statistics': {
     'task': 'daily_statistics',
     'schedule': crontab(minute=0, hour=1),  # 每天1点统计
     'options': {
     'queue': SCHEDULE_QUEUE,
     'routing_key': SCHEDULE_QUEUE,
     'exchange': SCHEDULE_QUEUE,
     'exchange_type': 'direct'
     }
     }
     }
    ​
    def create_celery_instance(name, config, broker=DEFAULT_CELERY_BROKER_URL):
       """
       创建Celery实例
    ​
       Args:
       name: celery名
       config: celery的配置
       broker: celery的broker
    ​
       Returns:
       celery_instance: celery实例
    ​
       """
        inst = Celery(name, broker=broker)
        inst.config_from_object(config)
        platforms.C_FORCE_ROOT = True  # running celery worker by rooter
        return inst
    ​
    

    Celery 的配置比较多,可以在 官方配置文档:http://docs.celeryproject.org/en/latest/userguide/configuration.html 查询每个配置项的含义。

    ---------------default_celery.py------------------------
    默认celery实例,并加载celery配置项
    --------------------------------------------------------
    ​
    from configs.celery_config import DefaultCeleryConfig, create_celery_instance
    ​
    ​
    default_inst = create_celery_instance(
     name='saas_2_default_celery',
     config=DefaultCeleryConfig)
    
    
    
    ---------------schedule_celery.py------------------------
    任务调度celery实例
    --------------------------------------------------------
    ​
    from configs.celery_config import ScheduleCeleryConfig, create_celery_instance
    ​
    schedule_inst = create_celery_instance(
     name='saas_2_schedule_celery',
     config=ScheduleCeleryConfig)
    
    
    ---------------celery_task.py------------------------
    任务函数
    -----------------------------------------------------
    ​
    @default_inst.task(name='company_version_daily_statistics')
    @celery_logging_decorator
    def company_version_daily_statistics():
     """
     企业账号版本统计
     :return:
     """
     date = datetime.now().strftime('%Y-%m-%d')
     logic_company_version_daily_statistics(date)
     logic_update_company_platform_statistics(date)
    
    ​
    @default_inst.task(name='daily_statistics')
    @celery_logging_decorator
    def daily_statistics():
     """ 高级筛选每日统计"""
     # 第二天统计前一天的
     yesterday = datetime.now() - timedelta(days=1)
     dt_str = yesterday.strftime('%Y-%m-%d')
    ​
     # 转线索数、营销触达数统计
     logic_daily_company_statistics(dt_str)
    ​
     # 计算洞客指数
     logic_cal_doncus_index(dt_str)
    
    @default_inst.task(name='enter_company_lead')
    @celery_logging_decorator
    def enter_task(company_id, update_frequency):
     """
     进入规则
     :param company_id:
     :param update_frequency:   更新频率
     * `1` - 每天
     * `2` - 每周日
     * `3` - 每月一号
     :return:
     """
     enter_rule_task(company_id, update_frequency)
    
    

    异步任务调用方法:

    # celery异步任务触发方法一:只支持传递任务函数的参数
    enter_task.delay('sdfwerfde2323434', 1)
    ​
    # celery异步任务触发方法二:支持任务函数参数,也支持任务的执行选项
    enter_task.apply_async(args=['sdfwerfde2323434', 1], kwargs={})
    -----------------------------------------------------------------------------------------------
    # 其他参数
    task_id:为任务分配唯一id,默认是uuid;
    countdown : 设置该任务等待一段时间再执行,单位为s;
    eta : 定义任务的开始时间;eta=time.time()+10;
    expires : 设置任务时间,任务在过期时间后还没有执行则被丢弃;
    retry : 如果任务失败后, 是否重试;使用true或false,默认为true
    shadow:重新指定任务的名字str,覆盖其在日志中使用的任务名称;
    retry_policy : {},重试策略.如下:
     max_retries : 最大重试次数, 默认为 3 次.
     interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待.
     interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2
     interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 .
    ​
    routing_key:自定义路由键;
    queue:指定发送到哪个队列;
    exchange:指定发送到哪个交换机;
    priority:任务队列的优先级,0到255之间,对于rabbitmq来说0是最高优先级;
    serializer:任务序列化方法;通常不设置;
    compression:压缩方案,通常有zlib, bzip2
    headers:为任务添加额外的消息;
    link:任务成功执行后的回调方法;是一个signature对象;可以用作关联任务;
    link_error: 任务失败后的回调方法,是一个signature对象;
    ​
    # 如下
    add.apply_async((2, 2), retry=True, retry_policy={
     'max_retries': 3,
     'interval_start': 0,
     'interval_step': 0.2,
     'interval_max': 0.2,
    })
    -----------------------------------------------------------------------------------------------
    ​
    # celery异步任务触发方法三: 可以发送未被注册的异步任务,即没有被default_inst.task装饰的任务
    default_inst.send_task("enter_company_lead", ['sdfwerfde2323434', 1])
    
    

    当然,要保证上述异步任务and下述定时任务都能正常执行,就需要先启动celery worker,启动命令行如下:

    celery -A configs.default_celery worker -P gevent -l info
    

    定时任务执行方法

    启动beat,执行定时任务时, Celery会通过celery beat进程来完成。Celery beat会保持运行, 一旦到了某一定时任务需要执行时, Celery beat便将其加入到queue中. 不像worker进程, Celery beat只需要一个即可。而且为了避免有重复的任务被发送出去,所以Celery beat仅能有一个。

    命令行启动:

    celery -A configs.schedule_celery beat -l info
    

    supervisor&celery

    如果你想将celery worker/beat要放到后台运行,推荐可以扔给supervisor。

    supervisor.conf如下:

    ; supervisor config file
    ​
    [unix_http_server]
    file=/var/run/supervisor.sock   ; (the path to the socket file)
    chmod=0700                       ; sockef file mode (default 0700)
    ​
    [supervisord]
    logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log)
    pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
    childlogdir=/var/log/supervisor            ; ('AUTO' child log dir, default $TEMP)
    ​
    ; the below section must remain in the config file for RPC
    ; (supervisorctl/web interface) to work, additional interfaces may be
    ; added by defining them in separate rpcinterface: sections
    [rpcinterface:supervisor]
    supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
    ​
    [supervisorctl]
    serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL  for a unix socket
    ​
    ; The [include] section can just contain the "files" setting.  This
    ; setting can list multiple files (separated by whitespace or
    ; newlines).  It can also contain wildcards.  The filenames are
    ; interpreted as relative to this file.  Included files *cannot*
    ; include files themselves.
    ​
    [include]
    files = /etc/supervisor/conf.d/*.conf
    
    
    ---------------------celery.conf---------------------------
    celery 相关supervisor配置
    ---------------------celery.conf---------------------------
    ​
    [program:project_name-celery]
    command=/root/project_name/bin/celery -A configs.default_celery worker -P gevent -l info
    autostart=true
    directory=/root/project_name/project_name
    autorestart=true
    startsecs=10
    startretries=1
    stopwaitsecs=600
    redirect_stderr=true
    stdout_logfile_maxbytes=50MB
    stdout_logfile=/var/log/supervisor/project_name/celery-access.log
    stderr_logfile=/var/log/supervisor/project_name/celery-error.log
    user=root
    ​
    [program:project_name-beat-celery]
    command=/root/project_name/bin/celery -A configs.schedule_celery beat -l info
    autostart=true
    directory=/root/project_name/project_name
    autorestart=true
    startsecs=10
    startretries=1
    stopwaitsecs=600
    redirect_stderr=true
    stdout_logfile_maxbytes=50MB
    stdout_logfile=/var/log/supervisor/project_name/beat-celery-access.log
    stderr_logfile=/var/log/supervisor/project_name/beat-celery-error.log
    user=root
    
    

    相关文章

      网友评论

          本文标题:Python 异步任务队列Celery 使用

          本文链接:https://www.haomeiwen.com/subject/wkwzectx.html