美文网首页
django定时器_djcelery+mq的使用

django定时器_djcelery+mq的使用

作者: 猪儿打滚 | 来源:发表于2021-01-22 09:59 被阅读0次

    环境

    python 3.6
    django 2.1.8
    

    下载安装

    celery==3.1.15
    django-celery==3.3.1
    flower==0.9.3
    

    代码步骤

    -1、配置 settings.py

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        
        'djcelery', # 注册
        ...
    ]
    
    
    import djcelery  # django的celery,省去了在celery中配置django环境,并且还能在django后台管理任务
    
    ## 下面是djcelery配置
    
    # 当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的
    # 所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task。
    djcelery.setup_loader()
    
    CELERY_ENABLE_UTC = True
    # CELERY_ENABLE_UTC = False
    
    # CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERY_TIMEZONE = TIME_ZONE
    
    BROKER_URL = 'amqp://guest@localhost//'
    
    CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
    
    CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' # 任务元数据保存到数据库中
    
    CELERY_ACCEPT_CONTENT = ['application/json']
    
    CELERY_TASK_SERIALIZER = 'json'
    
    CELERY_RESULT_SERIALIZER = 'json'
    
    # CELERY_TASK_RESULT_EXPIRES = 86400  # celery任务执行结果的超时时间, 此配置注释后,任务结果不会定时清理
    
    CELERYD_CONCURRENCY = 1 if DEBUG else 10 # celery worker的并发数
    
    CELERYD_MAX_TASKS_PER_CHILD = 100  # 每个worker执行了多少任务就会销毁
    
    • 2、在settings.py同级目录下创建celery.py
    from __future__ import absolute_import
    
    import os
    
    from celery import Celery
    
    from django.conf import settings
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'TestPaltForm.settings') # 项目的settings文件
    
    app = Celery('TestPaltForm') # 项目名为入参
    
    app.config_from_object('django.conf:settings') # 读取settings中的celery配置
    
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    • 3、创建app:timedtask

      timedtask目录结构
    • 3.1、timedtask/models.py

    from djcelery.models import PeriodicTask
    from django.db import models
    
    
    class TaskExtend(models.Model):
        """
        拓展PeriodicTask模型
        """
        create_time = models.DateTimeField('创建时间', auto_now_add=True)
        update_time = models.DateTimeField(auto_now=True, verbose_name='更新时间', help_text='更新时间')
        email_list = models.CharField('邮箱列表', max_length=2048, default='[]')
        author = models.CharField('创建人', max_length=100, default='')
        project = models.IntegerField('任务所选项目', default=0)
        periodic_task = models.OneToOneField(PeriodicTask, on_delete=models.CASCADE, related_name='taskextend')
    
    • 3.2、自定义过滤器timedtask/filtersets.py
    """
    Author: LZL
    """
    from django_filters import rest_framework as filters
    
    from djcelery.models import PeriodicTask
    
    
    class PeriodicTaskFilter(filters.FilterSet):
        name = filters.CharFilter(field_name="name", lookup_expr='contains')
        description = filters.CharFilter(field_name="description", lookup_expr='contains')
        author = filters.CharFilter(field_name="taskextend__author", lookup_expr='contains')
        create_time = filters.DateFromToRangeFilter(field_name='taskextend__create_time')
        update_time = filters.DateFromToRangeFilter(field_name='date_changed')
    
        class Meta:
            model = PeriodicTask
            fields = '__all__'
    
    • 3.3、序列化器timedtask/serializers.py
    """
    Author: LZL
    """
    from rest_framework import serializers
    from djcelery.models import PeriodicTask
    
    from apps.timedtask.models import TaskExtend
    
    
    class TaskExtendSerializer(serializers.ModelSerializer):
        """
        用例信息序列化
        """
    
        class Meta:
            model = TaskExtend
            fields = '__all__'
    
    
    class PeriodicTaskSerializer(serializers.ModelSerializer):
        """
        用例信息序列化
        """
        # 需要对taskextend进行序列化校验
        task_extend = TaskExtendSerializer(source='taskextend', read_only=True)
        # crontab_time = CrontabScheduleSerializer(source='crontab', read_only=True)
        # 对crontab的str返回的时间进行序列化校验:
        # return '{0} {1} {2} {3} {4} (m/h/d/dM/MY)'.format(
        #     cronexp(self.minute),
        #     cronexp(self.hour),
        #     cronexp(self.day_of_week),
        #     cronexp(self.day_of_month),
        #     cronexp(self.month_of_year),
        # )
        crontab_time = serializers.ReadOnlyField(source='crontab.__str__')
    
        class Meta:
            model = PeriodicTask
            fields = '__all__'
    
    • 3.4、任务类timedtask/tasks.py
    """
    Author: LZL
    """
    from __future__ import absolute_import
    
    import os
    from datetime import datetime
    
    from celery import shared_task
    from djcelery.models import PeriodicTask
    
    from TestPaltForm import settings
    from ..testcases.models import TestCases
    from ..testcases.serializers import TestcaseEnvSerializer
    from ..envs.models import Envs
    from utils import common
    
    
    def run_task_by_cases(func, periodic_args=None):
        """
            执行以testcase为维度的定时任务
        :param func: 定时任务时,传入调用的定时任务函数
        :return:
        """
       # 业务代码略过
    
    
    @shared_task
    def periodic_run(task_args):
        """
            task_args = {
            "case_list": [18, 19],
            "env": 4,
            "project": 10,
            "periodic": 10,
            "name": "定时任务1",
            "description": "定时任务1描述",
            "receivers": ["admin@admin.com", "test@qq.com"]
        }
        :param task_args:
        :return:
        """
        return run_task_by_cases(periodic_run, task_args)
    
    
    • 3.5、timedtask/views.py
    import json
    import logging
    
    from rest_framework.response import Response
    from rest_framework import generics, status
    from djcelery.models import PeriodicTask, CrontabSchedule, PeriodicTasks
    from rest_framework.views import APIView
    
    from .serializers import PeriodicTaskSerializer, TaskExtendSerializer
    from .filtersets import PeriodicTaskFilter
    from .models import TaskExtend
    from .tasks import periodic_run
    
    
    class PeriodicTaskView(generics.ListCreateAPIView):
        """
        提供查询,创建
        """
        queryset = PeriodicTask.objects.all().order_by('-date_changed')
        serializer_class = PeriodicTaskSerializer
        filterset_class = PeriodicTaskFilter
    
        def post(self, request, *args, **kwargs):
            """
            1、先创建或获取Crontab实例
            2、保存任务实例
            data 不要带日期类数据
            :param request:
            :param args:
            :param kwargs:
            :return:
            """
            data = json.loads(json.dumps(request.data))
            # 获取或创建crontab
            # crontab_time = {
            #     'minute': '1',
            #     'hour':'1',
            #     'day_of_week':'*',
            #     'day_of_month':'*',
            #     'month_of_year':'*'
            # }
            project = int(data.pop('project'))
            # 前端传递来的crontab时间dict
            crontab_time = data.get('crontab')
            if crontab_time:
                # 如果不带crontab的任务,则是手动运行的
                # 创建定时策略,并获取到实例对象
                crontab, _ = CrontabSchedule.objects.get_or_create(**crontab_time)
                data['crontab'] = crontab.id  # 获取定时策略的id
            data['task'] = 'apps.timedtask.tasks.periodic_run'  # 后期可改为动态获取
            # 保存任务实例
            email_list = data.pop('email_list')
            author = data.pop('author')
            # 序列化定时任务的参数
    
            serializer = PeriodicTaskSerializer(data=data)
            if serializer.is_valid():
                try:
                    serializer.save()  # 保存定时任务
                    obj = PeriodicTask.objects.get(pk=serializer.data['id'])
                    PeriodicTasks.changed(obj)  # 必须执行此更新,触发celery beat刷新
                    # 保存拓展信息
                    TaskExtend.objects.create(**{
                        'email_list': email_list,
                        'author': author,
                        'periodic_task_id': serializer.data['id'],
                        'project': project
                    })
                    return Response(serializer.data)
                except Exception as e:
                    return Response({'detail': serializer.errors}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
            else:
                return Response({'message': '创建失败,已有同名任务', 'detail': serializer.errors},
                                status=status.HTTP_400_BAD_REQUEST)
    
    
    class PeriodicTaskDetailView(generics.RetrieveUpdateDestroyAPIView):
        """
        debugtalk信息单查询、修改、删除
        """
        queryset = PeriodicTask.objects.all().order_by('-date_changed')
        serializer_class = PeriodicTaskSerializer
    
        def put(self, request, *args, **kwargs):
            """
            针对编辑功能
            :param request:
            :param args:
            :param kwargs:
            :return:
            """
            data = json.loads(json.dumps(request.data))
            # 获取或创建crontab
            project = int(data.pop('project'))
            crontab_time = data.get('crontab')
            if crontab_time:
                # 如果不带crontab的任务,则是手动运行的
                crontab, _ = CrontabSchedule.objects.get_or_create(**crontab_time)
                data['crontab'] = crontab.id
            # 保存任务实例
            task_extend = data.pop('task_extend')
            task_extend['project'] = project  # 更新所属项目
            task_extend['email_list'] = data.pop('email_list')
            try:
                periodic_id = data['id']
                PeriodicTask.objects.filter(id=periodic_id).update(**data)
                obj = PeriodicTask.objects.filter(id=periodic_id).first()
                if obj:
                    PeriodicTasks.changed(obj)  # 必须执行此更新,触发celery beat刷新
                    # 保存拓展信息
                    TaskExtend.objects.filter(pk=task_extend.get('id')).update(**task_extend)
                else:
                    return Response({'message': '定时任务不存在'}, status=status.HTTP_400_BAD_REQUEST)
                return Response({'message': '任务修改成功'}, status=status.HTTP_200_OK)
            except Exception as es:
                return Response({'message': '修改失败,已有同名任务'}, status=status.HTTP_400_BAD_REQUEST)
    
        def patch(self, request, *args, **kwargs):
            """
            局部修改,只修改enabled
            :param request:
            :param args:
            :param kwargs:
            :return:
            """
            try:
                obj = PeriodicTask.objects.get(pk=kwargs.get('pk'))
                # enabled_data = json.loads(request.data.get('enabled'))
                obj.enabled = request.data.get('enabled')
                obj.save()
                PeriodicTasks.changed(obj)  # 必须执行此更新,触发celery beat刷新
                return Response({'message': '{}成功'.format('启用' if obj.enabled else '禁用')}, status=status.HTTP_200_OK)
                # return Response({'enabled': obj.enabled}, status=status.HTTP_200_OK)
            except Exception as es:
                return Response({'msg': '任务状态修改失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
    
    
    class RunPeriodicTaskView(APIView):
        """
        立刻执行一次定时周期任务的任务
        """
    
        def post(self, request, *args, **kwargs):
            # 任务记录的id
            task_id = kwargs.get('pk')
            try:
                # args列表,第一个元素为task_info
                args = eval(PeriodicTask.objects.get(pk=task_id).args)
                periodic_run.delay(*args)
                return Response({'message': '任务开始运行,请稍后查询结果...'})
            except PeriodicTask.DoesNotExist:
                resp = {
                    'message': '所运行任务不存在,id:{}'.format(task_id)
                }
                return Response(resp)
    
    • 3.6、timedtask/urls.py
    """
    Author: LZL
    """
    from django.urls import path
    
    from .views import PeriodicTaskView, PeriodicTaskDetailView, RunPeriodicTaskView
    
    urlpatterns = [
        # 定时任务
        path('periodic/', PeriodicTaskView.as_view()),
        path('periodic/<int:pk>/', PeriodicTaskDetailView.as_view()),
        path('run_periodic/<int:pk>/', RunPeriodicTaskView.as_view()),  # 手动运行(定时)任务
    ]
    
    

    生成的数据表

    数据表

    启动

    启动worker:celery -A 项目名 worker -l info -P eventlet
    启动beat :celery -A 项目名beat -l info
    启动celery后台(需要查看才启动):celery flower
    启动mq:自行百度
    django后台也可以查看定时任务

    相关文章

      网友评论

          本文标题:django定时器_djcelery+mq的使用

          本文链接:https://www.haomeiwen.com/subject/lixszktx.html