美文网首页
celery定时任务及异常报警

celery定时任务及异常报警

作者: Gin_714d | 来源:发表于2019-08-01 18:18 被阅读0次

celery-beat分发定时任务

beat 概念

celery beat 是一种调度者;
celery beat 定期将任务及参数发送给集群中的可用worker节点,然后任务由可用worker节点执行。

beat配置

在celery的配置文件中

beat_schedule = {
     'tt': { # 任务名称
         'task': 'tasks.task_beat.tt', # 真实执行的task路径
         'schedule': crontab(minute= '*/1'), # crontab timer
         'options': {
             'queue': 'monitor:taskq' # 是否用单独的队列,如用单独队列,需要指定
         }
     },
}

beat/worker启动

编写好task
使用 celery beat -A tasks -l debug 启动beat
使用 celery -A tasks worker -Q 队列名称 -l info -E 启动worker

任务完成情况追踪

使用celery event机制,监听task执行成功时会发出的 task-succeeded 事件。
开启event 需要在配置文件中 配置 task_send_sent_event = True
event 介绍

def monitor_events():
    def on_event(event):
        """
        :param event:
        :type event dict
        :return:
        """
        task_result = literal_eval(event['result'])
        deal_with_task_result....      

    debug_logger = auto_log('beat recv')
    logger = influx_logger('influx_monitor_events')

    while 1:

        try:
            debug_logger.info('start...')
            with app.connection() as conn:
                recv = app.events.Receiver(conn, handlers={'task-succeeded': on_event})
                recv.capture(limit=None, timeout=10, wakeup=True)
        except (InterruptedError, KeyboardInterrupt):
            debug_logger.info(' recv end ...')
            break
        except Exception as e:
            debug_logger.error(e)

任务异常处理

有两种方式
1.重载Task.on_success、Task.on_failure
2.监听FAILURE事件

重载Task.on_success、Task.on_failure

这里采用自定义的Task类(Task是celery的内置组件。Task是可以从任何可调用对象创建的类。Task用于封装我们自定义的任务,在Task中,celery处理任务的分发,结果/异常的处理,任务的执行),来增加任务异常的处理。

class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        """Success handler.

             Run by the worker if the task executes successfully.

             Arguments:
                 retval (Any): The return value of the task.
                 task_id (str): Unique id of the executed task.
                 args (Tuple): Original arguments for the executed task.
                 kwargs (Dict): Original keyword arguments for the executed task.

             Returns:
                 None: The return value of this handler is ignored.
        """
        with open('test.txt', 'a+') as f:
            f.write(self.name+str(retval)+'\n')


    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Error handler.

             This is run by the worker when the task fails.

             Arguments:
                 exc (Exception): The exception raised by the task.
                 task_id (str): Unique id of the failed task.
                 args (Tuple): Original arguments for the task that failed.
                 kwargs (Dict): Original keyword arguments for the task that failed.
                 einfo (~billiard.einfo.ExceptionInfo): Exception information.

             Returns:
                 None: The return value of this handler is ignored.
        """
        dd_notice(
            {'alert_reason': 'celery beat exec failure', 'Exception': exc, 'task_id': task_id, 'task_name': self.name,
             'args': args, 'kwargs': kwargs, 'einfo': einfo, 'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M%S')}, task_failure_dd_uri)

然后将任务代码改为

@app.task(base=CallbackTask)
def xx():
    """
    :return:
    """
    from tasks.assignments.xx import XX
    return XX().run()

监听FAILURE事件

把上面监听task-succeeded的地方改为task-failure

总结

至此我们可以用celery执行定时任务,并且用event进行任务执行成功/失败进行相应的处理,也可以直接使用自定义的Task类,直接处理。

相关文章

  • celery定时任务及异常报警

    celery-beat分发定时任务 beat 概念 celery beat 是一种调度者;celery beat ...

  • celery with rabbitmq Control com

    使用 celery 处理定时任务和异步任务的时候,出现与 rabbitmq 连接中断的问题。具体异常如下: 查看对...

  • 基于celery及redis封装sanic的api

    背景介绍 celery 简介 其实celery不是消息队列,是一任务异步调用及定时任务调用处理的工具,并提供了后端...

  • Celery源码笔记(六) 定时任务

    定时任务定时任务是celery的一个重要功能,本节会对celery的定时任务进行分析。根据之前分析可知,在cele...

  • 通过celery_one避免Celery定时任务重复执行

    通过celery_one避免Celery定时任务重复执行 在使用Celery统计每日访问数量的时候,发现一个任务会...

  • Day 6

    celery定时任务 新手教程基本完成

  • django celery redis

    # celery redis celery 执行异步 定时 任务等的工作 架构: 1. broker : 前端接收...

  • CELERY 定时任务

    1. 介绍 Celery是一个强大的分布式任务队列,他可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运...

  • celery

    Django配置Celery执行异步任务和定时任务 celery是一个基于python开发的简单、灵活且可靠的分布...

  • 2019-08-08

    Django配置Celery执行异步任务和定时任务 celery是一个基于python开发的简单、灵活且可靠的分布...

网友评论

      本文标题:celery定时任务及异常报警

      本文链接:https://www.haomeiwen.com/subject/nvycdctx.html