美文网首页Python自动化运维
django使用celery做异步执行过程

django使用celery做异步执行过程

作者: Blues_rat | 来源:发表于2018-01-31 09:37 被阅读12次

    执行过程持续返回

    简要

    1. 挑用插件:
        django-celery  jenkins
    2. 基于Python版本:
        Python2.7
    3. 基于django版本:
        django1.8.18
    

    主要问题:

    需要把jenkins代码发布的执行过程持续输出,后台执行完成后消息数据返回到前台。
    

    解决方案:

    使用django-celery的异步执行操作,把异步执行的数据返回到rabbitmq,从rabbitmq读取数据写入数据库,页面中返回数据库的信息,页面使用定时器,当返回结果不为失败或者成功时,5秒刷新页面,读取最新的操作内容。
    

    具体代码如下:

    1. django setting.py配置:
    
        djcelery.setup_loader()
        BROKER_URL = 'amqp://guest:guest@localhost:5672//'
        BROKER_POOL_LIMIT = 0
        #CELERY_TASK_SERIALIZER = 'json'                    (由于设置Json以后输入也必须要使用Json格式,暂时未启用)
        #CELERY_RESULT_SERIALIZER = 'json'
        CELERY_ENABLE_UTC = True
        CELERY_RESULT_BACKEND= 'amqp'
    
        INSTALLED_APPS = (
        'djcelery',
        'app',
        )
    
    2. django celery.py配置:
        
        #!/usr/bin/env python
        # -*- coding:utf-8 -*-
        
        from __future__ import absolute_import
        import os
        from celery import Celery
        from django.conf import settings
        from app.backend.tasks import *
        os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CMDB.settings')
        
        app = Celery('CMDB')
        
        app.config_from_object('django.conf:settings')
        app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    3. app view配置:
    
        def job_release(request):
            if request.method == 'GET':
                try:
                    global job_enable                                 (执行开关,防止多次执行)
                    id = request.GET.get('id')                        (获取指定的ID)
                    name =  request.GET.get('name')                    (执行会带name,返回到执行页面不会带name,防止返回被执行)
                    all_result = jenkins_return.objects.filter(id=id) (获取当前执行的项目信息)
                    job_name = all_result[0]                          (执行的项目名称)
                    full_name = all_result.values()[0]['full_name']   (执行的项目全称,jenkins需要指定到子目录才能build)
                except:
                    all_result = job_release_return.objects.filter(id=id)    (返回,或者刷新,还是获取到当前的内容)
                    return render_to_response("job_release.html",locals())
                else:
                    if job_enable == 1:                                     (执行开关,为1的时候开启执行功能)
                        res = goto_html.delay(full_name,job_name)           (异步执行第一步,准备执行)
                        import_goto_html.delay(res.get())                   (导入到数据库)
                        ret_in = get_job_runinng_in.delay(full_name,res.get()) (异步执行第二步,获取执行中状态内容)
                        ret_end =get_job_runinng_end.delay(full_name,res.get()) (异步执行第三部,取得执行结果)
                        job_enable = 0                                          (执行完成后功能取消,当返回到执行页面时复位)
                    time.sleep(0.5)                                             (返回内容)
                    all_result = job_release_return.objects.all()
                    return render_to_response("job_release.html",locals())    
    
    4. js配置(很简单,不是执行成功或者执行失败时刷新页面):
    
        <script language="JavaScript">
        function myrefresh(){
            window.location.reload();
            }
        function show(){
            var mytable = document.getElementById("dataTables-example").rows[1].cells[7].innerHTML;
            if(mytable == "执行成功" || mytable == "执行失败"){
            return true;}
            else{
            myrefresh();}
        }
        setTimeout('show()',5000);
        </script>       
         
    5. app task.py配置:
        #!/usr/bin/env python
        # -*- coding:utf-8 -*-
        from __future__ import absolute_import
        from celery import task
        from app.backend.fb_jenkins import *
        import urllib
        import jenkins
        import time
        from datetime import datetime
        import sys
        
        jenkins_server_url=""                         (jenkins地址)
        host=""                                       (主机IP)
        user_id = ""                                  (jenkins用户)
        api_token = ""                                (jenkins token)
        #server=jenkins.Jenkins(jenkins_server_url, username=user_id, password=api_token)   (这里两种方式,一直使用oken,另一种使用passwd)
        server=jenkins.Jenkins(jenkins_server_url, username=user_id, password="*****")    
        joblist=[]
        
        #(jenkins的方法,获取项目构建过程)
        @task
        def get_job_output(jobName):
            last = server.get_job_info(jobName)['lastBuild']['number']
            return server.get_build_console_output(jobName,last)
        
        # (进入执行页面,并执行job任务)
        @task
        def goto_html(full_name,jobName):
            result = get_job_output(full_name)
            start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            result_log = " {0} {1} 程序开始构建".format(start_time, full_name)
            parameters = {}
            parameters['project0001'] = 'test001'
            server.build_job(full_name,parameters)
            results = []
            workspace = result.split('Building in workspace')[1].strip().split('\n')[0]
            next_build_number = server.get_job_info(full_name)['nextBuildNumber']
            results.extend([jobName,host,workspace,start_time,"end_time",next_build_number,next_build_number,"执行开始","result",result_log])
            return results
    
        # (写入数据库)
        @task
        def import_goto_html(ret):
            import_release_sql(ret)
            return True
        
        # (执行中状态)
        @task
        def get_job_runinng_in(full_name,ret):
            result = get_job_output(full_name)
            results = []
            jobName = ret[0]
            start_time = ret[3]
            workspace = ret[2]
            next_build_number = ret[5]
            time_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            for i in range(20):
                builds = server.get_running_builds()
                if len(builds)!=0:break
                time.sleep(0.2)
            result_log = ret[9] + "\n {0} {1} 程序构建中".format(time_now, full_name)
            results.extend(
                [jobName, host, workspace, start_time, "end_time", next_build_number, next_build_number, "执行中", "result",
                 result_log])
            import_goto_html(results)
            return results
    
        # (执行完成状态)
        @task
        def get_job_runinng_end(full_name,ret):
            results = []
            jobName = ret[0]
            start_time = ret[3]
            workspace = ret[2]
            next_build_number = ret[5]
            time_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            time.sleep(3)
            while True:
                try:
                    build_info = server.get_build_info(full_name, next_build_number)
                except:
                    continue
                else:
                   if build_info[u'building'] == False:
                        break
                   time.sleep(1)
        
            end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            result = get_job_output(full_name)
            result_log = ret[9] + "\n {0} {1} 程序构建中".format(time_now, full_name) + "\n {0} {1} 程序构建完成".format(end_time,full_name)
            results.extend(
                [jobName, host, workspace, start_time, end_time, next_build_number, next_build_number, "执行成功", result,
                 result_log])
            import_goto_html(results)
            return results
    

    相关文章

      网友评论

        本文标题:django使用celery做异步执行过程

        本文链接:https://www.haomeiwen.com/subject/kurhzxtx.html