背景
我们团队使用flask+celery+ansible+mongodb开发了一个性能测试平台,简单来说要做的事情就是将用户的一系列任务(主要是shell指令)通过ansible分发到不同的机器上去执行,获取任务的回显和状态在平台上进行显示和存储。
问题
需要解决的问题如下:
- ansible的任务我们自动封装成了playbook,执行过程中的输出希望实时进行输出,方便定位问题;
- celery原生对于任务的结果是不会进行持久化存储的,因此需要自己进行状态和结果的同步;
- celery原生对于异常和主动停止的任务是不会进行结果存储的,同样需要额外进行处理;
方案
方法挺多,目前尝试了各种方案之后,最终使用了如下方案:
redis作为result-backend并且开启result_extended,开启一个monitor进程用于消费worker的任务状态变化事件,具体来说分如下两步:
生产者处理
第一步:在celery的task中设置实时更新状态给result_backend;
import subprocess
import tempfile
import os
import time
from collections import deque
from typing import List
from celery import Celery, Task
celery_app = Celery(
'celery_app',
broker=os.getenv('REDIS_URI', "redis://localhost:6379"),
backend=os.getenv('REDIS_URI', "redis://localhost:6379")
)
celery_app.conf.update(
task_serializer="pickle",
result_serializer="pickle",
result_expires=0,
accept_content=['pickle'],
timezone="Asia/Shanghai",
enable_utc=False,
result_extended=True
)
class CallbackTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
'''
exc – The exception raised by the task.
task_id – Unique id of the failed task.
args – Original arguments for the task that failed.
kwargs – Original keyword arguments for the task that failed.
'''
outputs = self.request.kwargs.get("outputs", [])
# 注意点1:任务失败的时候记录outputs信息并且主动存储到后端
self.update_state(state="FAILURE", meta={"outputs": outputs})
self.backend.store_result(task_id, {"outputs": outputs}, "FAILURE")
super().on_failure(exc, task_id, args, kwargs, einfo)
# 注意点1:这里要指定base类为我们重载过on_failure方法的类
@celery_app.task(base=CallbackTask, bind=True)
def run_commands_as_playbook(self, pattern, commands: List[Command], envs: dict = None):
tmp = tempfile.NamedTemporaryFile(delete=False)
make_playbook(pattern, commands, tmp.name, envs)
def run_playbook_with_live_output(fname):
cmd = f"ansible-playbook -i {INVENTORY_FILE} {fname} "
# 注意点2、3:必须用Popen来获取实时输出,并且指定universal_newlines来获取str类型的输出
p = subprocess.Popen(cmd, shell=True, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
# 注意点4:限制存储的输出的最大长度
outputs = deque(maxlen=65535)
outputs.append(cmd)
self.update_state(state="STARTED", meta={"outputs": list(outputs), "pid": p.pid})
last_update_time = time.time()
while True:
line = p.stdout.readline()
if line:
outputs.append(line)
current_time = time.time()
# 注意点5:限制一下更新output的频率吧,1秒1次够用了
if current_time - last_update_time > 1:
# 注意点1:这里就需要实时的保存用户的输出信息
self.update_state(state="STARTED", meta={"outputs": list(outputs), "pid": p.pid})
last_update_time = current_time
else:
break
return list(outputs)
return run_playbook_with_live_output(tmp.name)
需要注意的是:
- 对于主动中止或者抛错的任务,celery默认只会返回
Exception
,导致运行过程中的输出丢失,因此需要重载一下on_failure
这个方法并且绑定到对应任务; - 对于长时间执行的任务,如果直接调用
subproces.run
或者subprocess.check_ouput
等方法,就无法实时获取任务的输出了,因此这里使用了Popen
方法,并且将stdout重定向到subprocess.PIPE
里,然后从PIPE里逐行去读取结果; -
subprocess.Popen
里面设置了参数universal_newlines=True
,否则进程的输出是bytes
类型,需要额外处理一下,否则存储到redis的时候会报错的; - 任务输出的日志可能非常多,我这里限制了一下输出的最大长度,我这里把中间的信息放到
deque
里面了,但是最后写回后端数据库的时候要注意转成list类型,否则redis可能不知道如何序列化deque
类型; - 每有一行输出就调用一次
self.update_state
操作,而这个操作涉及到将结果更新到数据库,当某些命令输出过长(比如上万行时),执行的效率就会非常的低,因此需要限制一下调用频率;
消费者处理
第二步:在monitor进程中接收任务状态变化的事件,然后将状态写回到数据库;
import os
import logging
from pymongo import MongoClient
from runner import celery_app
from urllib.parse import urlparse
from celery import states
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/myproject")
MONGO_DB = urlparse(MONGO_URI).path.strip("/")
mongo_client = MongoClient(MONGO_URI)
mongo_db = mongo_client[MONGO_DB]
def timestamp_to_local_time_str(ts: int):
return datetime.fromtimestamp(ts).strftime(DATETIME_FORMAT)
def my_monitor(app):
state = app.events.State()
def dump_state(event):
state.event(event)
task = state.tasks.get(event['uuid'])
task_collection = mongo_db["task"]
end_time = timestamp_to_local_time_str(task.timestamp) if task.state in states.READY_STATES else None
task_collection.update_one(
{"pid": task.uuid},
{"$set": {"status": task.state, "end_time": end_time}})
print('[%s] %s<%s> state changed: %s' % (
end_time, task.name, task.uuid, task.state,))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-succeeded': dump_state,
'task-started': dump_state,
'task-failed': dump_state,
'task-revoked': dump_state,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
my_monitor(celery_app)
注意点:
-
state.tasks.get()
方法返回的Task
对象和CeleryTask
不是一个类,因此没有date_done
属性可以直接查任务的完成时间,取而代之的是一个timestamp
的属性原来标识任务的状态切换时间戳; - 需要单独启动一个monitor进程,建议可以用docker或者supervisor来进行管理;
总结
过程确实有点曲折,各种找资料和试错,不过最终实践下来目前这个方案效果还是可以的。
网友评论