美文网首页
django使用celery执行异步任务时采用信号实现每个任务日

django使用celery执行异步任务时采用信号实现每个任务日

作者: 菩提老鹰 | 来源:发表于2022-11-05 18:33 被阅读0次
    django & celery & signal & after_setup_logger

    知识点

    通过本文可以获取的知识点有:

    • 1、celery信号中的 logging signal

    after_setup_logger 参考地址

    • 2、Django中如何配置和使用celery
    • 3、Django中如何加载celery 信号

    主要是Django中应用入口的 ready(self) 函数认识和使用

    • 4、Python logging自定义 Handler

    Python logging 模块介绍

    需求分析

    1、每个任务的日志独立存放,那么肯定是要能获取到任务id,然后按照任务id设定日志文件路径

    2、Django程序中执行task,那么程序中日志的写入,肯定不能使用print打印输出到启动程序Django主日志中去, 那肯定是采用logging模块配置不同的logger来实现

    3、Django怎么把自定义的logger和celery关联起来呢, celery有自己自带的logger(from celery.utils.log import get_task_logger),每个task 独立日志肯定不能放到这个自带的logger

    需求实现

    一、我们先创建Django工程和测试用的应用demoapp,然后在应用中利用celery跑一个任务task

    先给出工程和应用的结构

    ├── demoapp
    │   ├── __init__.py
    │   ├── admin.py
    │   ├── apps.py
    │   ├── celery
    │   │   └── __init__.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tasks.py
    │   ├── tests.py
    │   ├── urls.py
    │   └── views.py
    ├── django_celery_singal
    │   ├── __init__.py
    │   ├── __pycache__
    │   │   ├── __init__.cpython-36.pyc
    │   │   └── settings.cpython-36.pyc
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    └── manage.py
    

    注意这里有个知识点

    一般情况,Django继承celery的时候,大家默认是在工程文件夹(比如这里的 django_celery_singal)下创建celery,然后在工程的__init__.py 文件中进行import加载。

    其实这不是唯一选择。可以放到任何地方,关键在于启动celery的时候-A 参数后面根的值

    1.1、配置celery

    # demoapp/celery/__init__.py
    import os
    from celery import Celery
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_singal.settings')
    app = Celery(__name__)
    
    # 从Django的settings.py加载 celery的配置
    app.config_from_object('django.conf:settings', namespace='CELERY')
    # 自动发现应用中的tasks(应用中的tasks.py文件中定义的任务)
    app.autodiscover_tasks()
    
    # demo_celery_signal/settings.py
    # settings for celery
    CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERY_ENABLE_UTC = False
    CELERY_TASK_TRACK_STARTED = True
    CELERY_TASK_TIME_LIMIT = 30 * 60
    
    CELERY_BROKER_URL = "redis://127.0.0.1:6379/11"
    CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/11"
    CELERY_RESULT_SERIALIZER = 'json'
    

    重点📢: 这里切记两个点:

    1、在 demoapp/_init_.py 中引入celery ,内容如 from .celery import app as celery_app

    2、切记把demoapp加入到 INSTALLED_APPS 中(建议:创建完应用第一时间就加入到该配置项中去)

    1.2、在应用中定义task

    # demoapp/tasks.py
    from celery import shared_task
    
    @shared_task
    def task_hello():
        print('Hello Task under demoapp')
    

    1.3、task绑定到APP的view中并配置URL

    # demoapp/views.py
    from django.http import HttpResponse
    from demoapp.tasks import task_hello
    
    def demo(request):
        task_hello.delay()
        return HttpResponse("Task Executed")
    

    配置APP的URL

    # demoapp/urls.py
    from django.urls import path
    from demoapp import views 
    
    urlpatterns = [
        path('demo/', views.demo, name='demo-task'),
    ]
    

    把APP的URL加入到工程URL入口中去

    # demo_celery_singal/urls.py
    from django.urls import include
    
    urlpatterns = [
        ... ...,
        path('demoapp/', include('demoapp.urls')),
    ]
    

    1.4、启动Django和celery

    启动celery,注意这里启动的方式 celery -A demoapp worker -l infodemoapp 应用,而不是工程名称django_celery_signal ,因为celery使用的位置在 demoapp 中

    django & celery.png

    启动Django服务 python manage.py runserver 127.0.0.1:8088

    1.5、访问测试celery任务

    访问 http://127.0.0.1:8088/demoapp/demo/

    前端页面显示 Task Executed (view视图的返回),在后台celery的日志中显示如下,知道Django+celery运行异步任务,搭建完成

    [2022-11-05 09:19:50,973: WARNING/ForkPoolWorker-8] Hello Task under demoapp
    [2022-11-05 09:19:50,974: WARNING/ForkPoolWorker-8]
    
    [2022-11-05 09:19:50,978: INFO/ForkPoolWorker-8] Task demoapp.tasks.task_hello[ac93e84d-9903-4b7c-a444-64cf6be5a3da] succeeded in 0.022122333000879735s: 'ok'
    

    二、自定义logging的Handler

    因为要把每个task的日志放到独立的文件中的,这个日志的处理 handler 就需要自定义了

    因为是放到日志文件中,看了logging 模块的介绍我们知道, FileHandler 是继承自StreamHandler 或者我们这里也继承StreamHandler

    import os
    from logging import StreamHandler
    from celery import current_task
    from celery.signals import task_prerun, task_postrun
    
    
    class CeleryTaskLoggerHandler(StreamHandler):
        terminator = '\r\n'
    
        def __init__(self, *args, **kwargs):
            self.task_id_fd_mapper = {}
            super().__init__(*args, **kwargs)
            # 使用 celery的task信号,设置任务开始和结束时的执行的东西
            # 主要是获取task_id 然后创建对应的独立任务日志文件
            task_prerun.connect(self.on_task_start)
            task_postrun.connect(self.on_start_end)
    
        @staticmethod
        def get_current_task_id():
            # celery 内置提供方法获取task_id 
            if not current_task:
                return
            task_id = current_task.request.root_id
            return task_id
    
        def on_task_start(self, sender, task_id, **kwargs):
            # 这里是根据task_id 定义每个任务的日志文件存放
            log_path = os.path.join('logs/', f"{task_id}.log")
            f = open(log_path, 'a')
            self.task_id_fd_mapper[task_id] = f
    
        def on_start_end(self, sender, task_id, **kwargs):
            f = self.task_id_fd_mapper.pop(task_id, None)
            if f and not f.closed:
                f.close()
            self.task_id_fd_mapper.pop(task_id, None)
    
        def emit(self, record):
            # 自定义Handler必须要重写的一个方法
            task_id = self.get_current_task_id()
            if not task_id:
                return
            try:
                f = self.task_id_fd_mapper.get(task_id)
                self.write_task_log(f, record)
                self.flush()
            except Exception:
                self.handleError(record)
    
        def write_task_log(self, f, record):
            # 日志的实际写入
            if not f:
                raise ValueError('Not found thread task file')
            msg = self.format(record)
            f.write(msg)
            f.write(self.terminator)
            f.flush()
    
        def flush(self):
            for f in self.task_id_fd_mapper.values():
                f.flush()
    

    三、Django配置调用celery的logging signal

    3.1、先创建信号处理函数

    先定义信号回调处理函数add_celery_logger_handler, 然后进行信号的绑定,绑定一般是采用装饰器的方式

    当然也可以不采用这种方式,然后在需要使用信号的地方,进行单独绑定配置(after_setup_logger.connect(add_celery_logger_handler)

    import logging
    from celery.signals import after_setup_logger
    
    @after_setup_logger.connect
    def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
        if not logger:
            return
        logger = logging.getLogger('celery_signal')
        handler = logging.FileHandler('celery_signal.log')
        formatter = logging.Formatter(logging.BASIC_FORMAT)
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        logger.info("Here call the celery logging signal - after_setup_logger")
    

    这个时候重新启动celery,是不会产生celery_signal.log 文件,那就更不会调用对应的信号回调了

    3.2、Django绑定celery的信号

    上面只是定了信号的回调函数 然后和信号进行了绑定,但是Django怎么调用celery的信号处理呢?

    答案是利用Django应用的入口ready() 函数

    # demoapp/apps.py
    from django.apps import AppConfig
    
    class DemoappConfig(AppConfig):
        default_auto_field = 'django.db.models.BigAutoField'
        name = 'demoapp'
        verbose_name = "Celery Signal App"
    
        def ready(self):
            # 可以添加如下语句测试 Django启动的时候会不会执行到这里
            print('我被执行了!')
            # 导入上面定义的 信号处理回调
            from demoapp.celery import signal_handler
            super().ready()
    

    然后重新启动celery, 查看 celery_signal.log 日志文件

    cat celery_signal.log
    INFO:celery_signal:Here call the celery logging signal - after_setup_logger
    

    可以知道,Django 绑定了 celery的信号

    四、使用celery的信号after_setup_logger绑定自定义的Handler

    修改上面定义的 信号回调函数,绑定自定义的日志处理Handler

    # demoapp/celery/signal_handler.py
    from celery.signals import after_setup_logger
    from .logger import CeleryTaskLoggerHandler
    
    @after_setup_logger.connect
    def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
        if not logger:
            return
        task_handler = CeleryTaskLoggerHandler()
        task_handler.setLevel(loglevel)
        formatter = logging.Formatter(format)
        task_handler.setFormatter(formatter)
        logger.addHandler(task_handler)
    

    这里需要先手动在工程目录下创建一个 logs 文件夹,因为handler中没有对应logs不存在做判断处理

    然后我们访问Django的view视图 http://127.0.0.1:8088/demoapp/demo/ 查看logs 目录发现有个UUID为文件名的log文件

    [ 22-11-05 18:17 ] [ colinspace ] [/tmp/django_celery_singal] ls -l logs
    total 8
    -rw-r--r--  1 colinspace  wheel  138 11  5 18:17 8ba8d8d6-9d28-4b64-a396-b91f11ae0df8.log
    [ 22-11-05 18:17 ] [ colinspace ] [/tmp/django_celery_singal] cat logs/8ba8d8d6-9d28-4b64-a396-b91f11ae0df8.log
    Hello Task under demoapp
    
    Task demoapp.tasks.task_hello[8ba8d8d6-9d28-4b64-a396-b91f11ae0df8] succeeded in 0.03539445897331461s: 'ok'
    

    这里的Hello Task under demoapp 是task输出的日志,'ok' 是task的返回值,没有的话是None

    至此,完全实现了刚开始的需求。


    完美实现~ 项目源码详见
    https://gitee.com/colin5063/django-learnning-examples/tree/master/django_celery_singal

    如果觉得文章对你有用,请不吝点赞和关注公众号搜索 全栈运维 或者 DailyJobOps

    个人博客 http://blog.colinspace.com/

    知乎平台 https://www.zhihu.com/people/colin-31-49

    CSDN平台 https://blog.csdn.net/eagle5063

    简书平台 https://www.jianshu.com/u/6d793fbacc88

    相关文章

      网友评论

          本文标题:django使用celery执行异步任务时采用信号实现每个任务日

          本文链接:https://www.haomeiwen.com/subject/ennptdtx.html