美文网首页@IT·互联网
celery封装耗时shell任务并实时获取输出的方法

celery封装耗时shell任务并实时获取输出的方法

作者: 小餐包 | 来源:发表于2024-06-29 17:41 被阅读0次

背景

我们团队使用flask+celery+ansible+mongodb开发了一个性能测试平台,简单来说要做的事情就是将用户的一系列任务(主要是shell指令)通过ansible分发到不同的机器上去执行,获取任务的回显和状态在平台上进行显示和存储。

问题

需要解决的问题如下:

  1. ansible的任务我们自动封装成了playbook,执行过程中的输出希望实时进行输出,方便定位问题;
  2. celery原生对于任务的结果是不会进行持久化存储的,因此需要自己进行状态和结果的同步;
  3. celery原生对于异常和主动停止的任务是不会进行结果存储的,同样需要额外进行处理;

方案

方法挺多,目前尝试了各种方案之后,最终使用了如下方案:

redis作为result-backend并且开启result_extended,开启一个monitor进程用于消费worker的任务状态变化事件,具体来说分如下两步:

生产者处理

第一步:在celery的task中设置实时更新状态给result_backend;

import subprocess
import tempfile
import os
import time
from collections import deque
from typing import List

from celery import Celery, Task


celery_app = Celery(
    'celery_app',
    broker=os.getenv('REDIS_URI', "redis://localhost:6379"),
    backend=os.getenv('REDIS_URI', "redis://localhost:6379")
)
celery_app.conf.update(
    task_serializer="pickle",
    result_serializer="pickle",
    result_expires=0,
    accept_content=['pickle'],
    timezone="Asia/Shanghai",
    enable_utc=False,
    result_extended=True
)


class CallbackTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        '''
        exc – The exception raised by the task.
        task_id – Unique id of the failed task.
        args – Original arguments for the task that failed.
        kwargs – Original keyword arguments for the task that failed.
        '''
        outputs = self.request.kwargs.get("outputs", [])
         # 注意点1:任务失败的时候记录outputs信息并且主动存储到后端
        self.update_state(state="FAILURE", meta={"outputs": outputs})
        self.backend.store_result(task_id, {"outputs": outputs}, "FAILURE")
        super().on_failure(exc, task_id, args, kwargs, einfo)


# 注意点1:这里要指定base类为我们重载过on_failure方法的类
@celery_app.task(base=CallbackTask, bind=True)
def run_commands_as_playbook(self, pattern, commands: List[Command], envs: dict = None):
    tmp = tempfile.NamedTemporaryFile(delete=False)
    make_playbook(pattern, commands, tmp.name, envs)

    def run_playbook_with_live_output(fname):
        cmd = f"ansible-playbook -i {INVENTORY_FILE} {fname} "
        # 注意点2、3:必须用Popen来获取实时输出,并且指定universal_newlines来获取str类型的输出
        p = subprocess.Popen(cmd, shell=True, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
        # 注意点4:限制存储的输出的最大长度
        outputs = deque(maxlen=65535)
        outputs.append(cmd)
        self.update_state(state="STARTED", meta={"outputs": list(outputs), "pid": p.pid})
        last_update_time = time.time()
        while True:
            line = p.stdout.readline()
            if line:
                outputs.append(line)
                current_time = time.time()
                # 注意点5:限制一下更新output的频率吧,1秒1次够用了
                if current_time - last_update_time > 1:
                    # 注意点1:这里就需要实时的保存用户的输出信息
                    self.update_state(state="STARTED", meta={"outputs": list(outputs), "pid": p.pid})
                    last_update_time = current_time
            else:
                break
        return list(outputs)

    return run_playbook_with_live_output(tmp.name)

需要注意的是:

  1. 对于主动中止或者抛错的任务,celery默认只会返回Exception,导致运行过程中的输出丢失,因此需要重载一下on_failure这个方法并且绑定到对应任务;
  2. 对于长时间执行的任务,如果直接调用subproces.run或者subprocess.check_ouput等方法,就无法实时获取任务的输出了,因此这里使用了Popen方法,并且将stdout重定向到subprocess.PIPE里,然后从PIPE里逐行去读取结果;
  3. subprocess.Popen里面设置了参数universal_newlines=True,否则进程的输出是bytes类型,需要额外处理一下,否则存储到redis的时候会报错的;
  4. 任务输出的日志可能非常多,我这里限制了一下输出的最大长度,我这里把中间的信息放到deque里面了,但是最后写回后端数据库的时候要注意转成list类型,否则redis可能不知道如何序列化deque类型;
  5. 每有一行输出就调用一次self.update_state操作,而这个操作涉及到将结果更新到数据库,当某些命令输出过长(比如上万行时),执行的效率就会非常的低,因此需要限制一下调用频率;

消费者处理

第二步:在monitor进程中接收任务状态变化的事件,然后将状态写回到数据库;

import os
import logging

from pymongo import MongoClient

from runner import celery_app
from urllib.parse import urlparse
from celery import states

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/myproject")
MONGO_DB = urlparse(MONGO_URI).path.strip("/")
mongo_client = MongoClient(MONGO_URI)
mongo_db = mongo_client[MONGO_DB]

def timestamp_to_local_time_str(ts: int):
    return datetime.fromtimestamp(ts).strftime(DATETIME_FORMAT)
    

def my_monitor(app):
    state = app.events.State()

    def dump_state(event):
        state.event(event)
        task = state.tasks.get(event['uuid'])
        task_collection = mongo_db["task"]
        end_time = timestamp_to_local_time_str(task.timestamp) if task.state in states.READY_STATES else None
        task_collection.update_one(
            {"pid": task.uuid},
            {"$set": {"status": task.state, "end_time": end_time}})
        print('[%s] %s<%s> state changed: %s' % (
            end_time, task.name, task.uuid, task.state,))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
            'task-succeeded': dump_state,
            'task-started': dump_state,
            'task-failed': dump_state,
            'task-revoked': dump_state,
            '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)


if __name__ == '__main__':
    my_monitor(celery_app)

注意点:

  1. state.tasks.get()方法返回的Task对象和CeleryTask不是一个类,因此没有date_done属性可以直接查任务的完成时间,取而代之的是一个timestamp的属性原来标识任务的状态切换时间戳;
  2. 需要单独启动一个monitor进程,建议可以用docker或者supervisor来进行管理;

总结

过程确实有点曲折,各种找资料和试错,不过最终实践下来目前这个方案效果还是可以的。

相关文章

网友评论

    本文标题:celery封装耗时shell任务并实时获取输出的方法

    本文链接:https://www.haomeiwen.com/subject/uayucjtx.html