美文网首页ZStack博客集
ZStack源码剖析之模块鉴赏——LongJob

ZStack源码剖析之模块鉴赏——LongJob

作者: 泊浮目 | 来源:发表于2018-03-11 12:18 被阅读28次

    本文首发于泊浮目的专栏:https://segmentfault.com/blog/camile

    前言

    在ZStack中,当用户在UI上发起操作时,前端会调用后端的API对实际的资源发起操作请求。但在一个分布式系统中,我们不能假设网络是可靠的(同样要面对的还有单点故障等)——这往往导致API会超时。ZStack有默认的API超时机制,为30mins。但从UI上看来,用户的体验不是很好,如下:


    如果API遇到什么情况而一直没有响应,在这里用户也只能默默等到其超时。因为这个状态下,API是交给一个线程在执行的,见ZStack源码剖析之核心库鉴赏——ThreadFacade中的分析
    。最可怕的是,由于队列的存在,对该资源操作的API将全部处于队列中而成为等待状态。

    解决方案

    在ZStack 2.3版本开始引入了一个新的概念——LongJob。这基于ZStack的原有设计——FlowChain(我在我的博客中详细分析过FlowChain,如果不懂的小伙伴可以点这里),依靠FlowChain,我们把业务逻辑拆成一个个个Flow,并设置对应的RollBack。为了避免之后讲起来有点迷,先解释一下技术名词。

    LongJob的状态是用于被APIQuery的,也提供了进度条。并且允许start、stop、cancel等行为。

    名词

    LongJob

    长任务。以API可操作的概念具现。上面提到过,允许运行、暂停、取消等行为。

    LongJobInstance

    长任务实例。每个作业执行时,都会生成一个实例,实例会存放在LongJobVO这个数据库表中。便于UI调用API查看各个LongJobInstance的状态。

    Flow

    最小的一个业务单元。LongJob的组成,前面说过,LongJob基于FlowChain。

    LongJob Parameters

    LongJob参数。用于提交LongJob的参数,不同的参数可以区分不同的Job。

    数据结构

    LongJobVO

    @Entity
    @Table
    public class LongJobVO extends ResourceVO {
        @Column
        private String name;
    
        @Column
        private String description;
    
        @Column
        private String apiId;
    
        @Column
        private String jobName;
    
        @Column
        private String jobData;
    
        @Column
        private String jobResult;
    
        @Column
        @Enumerated(EnumType.STRING)
        private LongJobState state;
    
        @Column
        private String targetResourceUuid;
    
        @Column
        @ForeignKey(parentEntityClass = ManagementNodeVO.class, onDeleteAction = ForeignKey.ReferenceOption.SET_NULL)
        private String managementNodeUuid;
    
        @Column
        private Timestamp createDate;
    
        @Column
        private Timestamp lastOpDate;
    //忽略get set方法
    }
    

    该数据结构描述了如下关键信息:

    • targeResourceUuid - 用以描述 job 针对的资源,对于分类查找比较有用。通过 resourceUuid 可以在 ResourceVO 里找到类型。
    • apiId - 用以查询该 job 在 TaskProgressVO 中的进度信息。
    • jobName - 执行该 job 的 class 名字。参见下面的 JobExecution (类似现有的 AbstractSchedulerJob)
    • jobData - 存放执行该 job 需要的额外参数信息。

    LongJob

    public interface LongJob {
        void start(LongJobVO job, Completion completion);
        void cancel(LongJobVO job, Completion completion);
    }
    

    所有LongJob都必须实现该接口,并实现start/cancel等方法。

    LongJobFor

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface LongJobFor {
        Class<?> value();
    }
    

    为具体的LongJob增加该注解,表示该LongJob针对哪个APIMessage。

    比如为BackupStorageMigrateImageJob增加注解:@LongJobFor(APIBackupStorageMigrateImageMsg.class)

    LongJobData

    interface LongJobData {
    }
    

    由于LongJob要复用现有逻辑以及保证可维护性,这里处理的代码和原先逻辑为同一处。handleApiMessage和handleLongJobMessage必须要将所有的参数抽出来再传到共用的逻辑层。不仅如此,之后定时任务也有可能做成LongJob,故此定义这个接口。

    LongJobMessageData

    
    public class LongJobMessageData implements LongJobData {
        protected final NeedReplyMessage needReplyMessage;
     
        public LongJobMessageData(NeedReplyMessage msg){
            this.needReplyMessage = msg;
        }
     
        public NeedReplyMessage getNeedReplyMessage() {
            return needReplyMessage;
        }
    }
    

    该接口实现了LongJobData(这里LongJobData仅仅用于标识一个类型),用于完成目前的需求——部分LongJob Feature来自于APIMessage的改进。而InnerMessage和APIMessage都继承于NeedReplyMessage,为加强代码可读性,将公用数据结构抽取了出来,方便调用。

    LongJobFactory

    根据jobName获取LongJob实例。

    比如当jobName为APIBackupStorageMigrateImageMsg时,获取BackupStorageMigrateImageJob实例。

    LongJobManager

    用以处理 Job 相关的 API,比如 APICancelJobMsg,APIRestartJobMsg 等等。维护 jobUuid 和相应的 CancellableSharedFlowChain 之间的关系。

    CancellableShareFlowChain

    继承 ShareFlowChain,实现 Cancellable。每个 Job 底层逻辑都必须用 CancellableSharedFlowChain 实现。

    详解

    LongJob相关的API


    在图中我们可以看到LongJob提供了几个API,较为重要的是QueryAPI——用户可以使用它来查询LongJob的一个进度状态。

    从白话讲起

    LongJob则是基于FlowChain之上扩展的,首先,每个LongJob调用与原有APIMessage行为相同的内部Message。我们以APIAddImageMsg为例,看一下它的逻辑。

    在这里,我们可以看到Msg们将其的参数都Copy到了相应的LongJobData中,并进行传参,进入了一个统一的入口。这样便于逻辑的维护,免于和原有的API handle处分为两段逻辑。

    再看调用实例

    那么是如何调用的呢?按照老规矩,我们来看一个TestCase——AddImageLongJobCase

        void testAddImage() {
            int oldSize = Q.New(ImageVO.class).list().size()
            int flag = 0
            myDescription = "my-test"
    
            env.afterSimulator(SftpBackupStorageConstant.DOWNLOAD_IMAGE_PATH) { Object response ->
                //DownloadImageMsg
                LongJobVO vo = Q.New(LongJobVO.class).eq(LongJobVO_.description, myDescription).find()
                assert vo.state == LongJobState.Running
                flag += 1
                return response
            }
    
            APIAddImageMsg msg = new APIAddImageMsg()
            msg.setName("TinyLinux")
            msg.setBackupStorageUuids(Collections.singletonList(bs.uuid))
            msg.setUrl("http://192.168.1.20/share/images/tinylinux.qcow2")
            msg.setFormat(ImageConstant.QCOW2_FORMAT_STRING)
            msg.setMediaType(ImageConstant.ImageMediaType.RootVolumeTemplate.toString())
            msg.setPlatform(ImagePlatform.Linux.toString())
    
            LongJobInventory jobInv = submitLongJob {
                sessionId = adminSession()
                jobName = "APIAddImageMsg"
                jobData = gson.toJson(msg)
                description = myDescription
            } as LongJobInventory
    
            assert jobInv.getJobName() == "APIAddImageMsg"
            assert jobInv.state == org.zstack.sdk.LongJobState.Running
    
            retryInSecs() {
                LongJobVO job = dbFindByUuid(jobInv.getUuid(), LongJobVO.class)
                assert job.state == LongJobState.Succeeded
            }
    
            int newSize = Q.New(ImageVO.class).count().intValue()
            assert newSize > oldSize
            assert 1 == flag
        }
    

    可以看到本质是将原来的APIMsg转为字符串作为LongJob的Data传入,调用起来很方便。

    实现

    再来看看它的实现,当APISubmitLongJobMsg被发送出去后,handle的地方做了什么呢?见LongJobManagerImpl

        private void handle(APISubmitLongJobMsg msg) {
            // create LongJobVO
            LongJobVO vo = new LongJobVO();
            if (msg.getResourceUuid() != null) {
                vo.setUuid(msg.getResourceUuid());
            } else {
                vo.setUuid(Platform.getUuid());
            }
            if (msg.getName() != null) {
                vo.setName(msg.getName());
            } else {
                vo.setName(msg.getJobName());
            }
            vo.setDescription(msg.getDescription());
            vo.setApiId(msg.getId());
            vo.setJobName(msg.getJobName());
            vo.setJobData(msg.getJobData());
            vo.setState(LongJobState.Waiting);
            vo.setTargetResourceUuid(msg.getTargetResourceUuid());
            vo.setManagementNodeUuid(Platform.getManagementServerId());
            vo = dbf.persistAndRefresh(vo);
            logger.info(String.format("new longjob [uuid:%s, name:%s] has been created", vo.getUuid(), vo.getName()));
            tagMgr.createTagsFromAPICreateMessage(msg, vo.getUuid(), LongJobVO.class.getSimpleName());
            acntMgr.createAccountResourceRef(msg.getSession().getAccountUuid(), vo.getUuid(), LongJobVO.class);
            msg.setJobUuid(vo.getUuid());
    
            // wait in line
            thdf.chainSubmit(new ChainTask(msg) {
                @Override
                public String getSyncSignature() {
                    return "longjob-" + msg.getJobUuid();
                }
    
                @Override
                public void run(SyncTaskChain chain) {
                    APISubmitLongJobEvent evt = new APISubmitLongJobEvent(msg.getId());
                    LongJobVO vo = dbf.findByUuid(msg.getJobUuid(), LongJobVO.class);
                    vo.setState(LongJobState.Running);
                    vo = dbf.updateAndRefresh(vo);
                    // launch the long job right now
                    ThreadContext.put(Constants.THREAD_CONTEXT_API, vo.getApiId());
                    ThreadContext.put(Constants.THREAD_CONTEXT_TASK_NAME, vo.getJobName());
                    LongJob job = longJobFactory.getLongJob(vo.getJobName());
                    job.start(vo, new Completion(msg) {
                        LongJobVO vo = dbf.findByUuid(msg.getJobUuid(), LongJobVO.class);
    
                        @Override
                        public void success() {
                            vo.setState(LongJobState.Succeeded);
                            vo.setJobResult("Succeeded");
                            dbf.update(vo);
                            logger.info(String.format("successfully run longjob [uuid:%s, name:%s]", vo.getUuid(), vo.getName()));
                        }
    
                        @Override
                        public void fail(ErrorCode errorCode) {
                            vo.setState(LongJobState.Failed);
                            vo.setJobResult("Failed : " + errorCode.toString());
                            dbf.update(vo);
                            logger.info(String.format("failed to run longjob [uuid:%s, name:%s]", vo.getUuid(), vo.getName()));
                        }
                    });
    
    
                    evt.setInventory(LongJobInventory.valueOf(vo));
                    logger.info(String.format("longjob [uuid:%s, name:%s] has been started", vo.getUuid(), vo.getName()));
                    bus.publish(evt);
    
                    chain.next();
                }
    
                @Override
                public String getName() {
                    return getSyncSignature();
                }
            });
        }
    

    这段逻辑大致为:

    • 创建一个LongJob记录,以及相关的SystemTag和账户资源管理引用
    • 提交至线程池。使用LongJobFactory获取一个LongJob实例。并执行LongJob对应实现的start,在合适的时机进行状态变化。

    LongJobFactory

    public class LongJobFactoryImpl implements LongJobFactory, Component {
        private static final CLogger logger = Utils.getLogger(LongJobFactoryImpl.class);
        /**
         * Key:LongJobName
         */
        private TreeMap<String, LongJob> allLongJob = new TreeMap<>();
    
        @Override
        public LongJob getLongJob(String jobName) {
            LongJob job = allLongJob.get(jobName);
            if (null == job) {
                throw new OperationFailureException(operr("%s has no corresponding longjob", jobName));
            }
            return job;
        }
    
        @Override
        public boolean start() {
            LongJob job = null;
            List<Class> longJobClasses = BeanUtils.scanClass("org.zstack", LongJobFor.class);
            for (Class it : longJobClasses) {
                LongJobFor at = (LongJobFor) it.getAnnotation(LongJobFor.class);
                try {
                    job = (LongJob) it.newInstance();
                } catch (InstantiationException | IllegalAccessException e) {
                    e.printStackTrace();
                }
                if (null == job) {
                    logger.warn(String.format("[LongJob] class name [%s] but get LongJob instance is null ", at.getClass().getSimpleName()));
                    continue;
                }
                logger.debug(String.format("[LongJob] collect class [%s]", job.getClass().getSimpleName()));
                allLongJob.put(at.value().getSimpleName(), job);
            }
            return true;
        }
    
        @Override
        public boolean stop() {
            allLongJob.clear();
            return true;
        }
    }
    

    该FactoryImpl继承了Component接口。在ZStack Start的时候会利用反射收集带有LongJobFor这个Annotation的Class。在原先的版本中则是每一次调用的时候利用反射去寻找,会造成一个不必要的开销。故此这里也是做了一个Cache般的改进,因为在Application起来后是不会动态的去添加一种LongJob的。

    回来,还是以AddImageLongJob为例,我们来看看start时会做什么,见AddImageLongJob

    package org.zstack.image;
    
    import org.springframework.beans.factory.annotation.Autowire;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Configurable;
    import org.zstack.core.Platform;
    import org.zstack.core.cloudbus.CloudBus;
    import org.zstack.core.cloudbus.CloudBusCallBack;
    import org.zstack.core.db.DatabaseFacade;
    import org.zstack.header.core.Completion;
    import org.zstack.header.image.APIAddImageMsg;
    import org.zstack.header.image.AddImageMsg;
    import org.zstack.header.image.ImageConstant;
    import org.zstack.header.longjob.LongJobFor;
    import org.zstack.header.longjob.LongJobVO;
    import org.zstack.header.message.MessageReply;
    import org.zstack.longjob.LongJob;
    import org.zstack.utils.gson.JSONObjectUtil;
    
    
    /**
     * Created by on camile 2018/2/2.
     */
    @LongJobFor(APIAddImageMsg.class)
    @Configurable(preConstruction = true, autowire = Autowire.BY_TYPE)
    public class AddImageLongJob implements LongJob {
        @Autowired
        protected CloudBus bus;
        @Autowired
        protected DatabaseFacade dbf;
    
        @Override
        public void start(LongJobVO job, Completion completion) {
            AddImageMsg msg = JSONObjectUtil.toObject(job.getJobData(), AddImageMsg.class);
            bus.makeLocalServiceId(msg, ImageConstant.SERVICE_ID);
            bus.send(msg, new CloudBusCallBack(null) {
                @Override
                public void run(MessageReply reply) {
                    if (reply.isSuccess()) {
                        completion.success();
                    } else {
                        completion.fail(reply.getError());
                    }
                }
            });
        }
    
        @Override
        public void cancel(LongJobVO job, Completion completion) {
            // TODO
            completion.fail(Platform.operr("not supported"));
        }
    }
    

    这里则是发送了一个inner msg出去,我们看一下handle处的逻辑:

        private void handle(AddImageMsg msg) {
            AddImageReply evt = new AddImageReply();
            AddImageLongJobData data = new AddImageLongJobData(msg);
            BeanUtils.copyProperties(msg, data);
            handleAddImageMsg(data, evt);
        }
    

    可以看到这里将msg的参数全部取了出来,放入一个公共结构里,并传入了真正的handle处。

    APIAddImageMsg也是这么做的:

        private void handle(final APIAddImageMsg msg) {
            APIAddImageEvent evt = new APIAddImageEvent(msg.getId());
            AddImageLongJobData data = new AddImageLongJobData(msg);
            BeanUtils.copyProperties(msg, data);
            handleAddImageMsg(data, evt);
        }
    

    在前面提到过,为了更好的可维护性,这两个Msg共用了一段逻辑。

    复用Intercepter

    了解ZStack的同学都知道,任何一条APIMsg发送的时候会进入Intercepter。那么LongJob的submit其实是把APIMsg作为参数传入了,那么如何复用之前的Intercepter呢?我们来看看LongJobApiInterceptor

    public class LongJobApiInterceptor implements ApiMessageInterceptor, Component {
        private static final CLogger logger = Utils.getLogger(LongJobApiInterceptor.class);
    
        /**
         * Key:LongJobName
         */
        private TreeMap<String, Class<APIMessage>> apiMsgOfLongJob = new TreeMap<>();
    
        @Override
        public APIMessage intercept(APIMessage msg) throws ApiMessageInterceptionException {
            if (msg instanceof APISubmitLongJobMsg) {
                validate((APISubmitLongJobMsg) msg);
            } else if (msg instanceof APICancelLongJobMsg) {
                validate((APICancelLongJobMsg) msg);
            } else if (msg instanceof APIDeleteLongJobMsg) {
                validate((APIDeleteLongJobMsg) msg);
            }
    
            return msg;
        }
    
        private void validate(APISubmitLongJobMsg msg) {
            Class<APIMessage> apiClass = apiMsgOfLongJob.get(msg.getJobName());
            if (null == apiClass) {
                throw new ApiMessageInterceptionException(argerr("%s is not an API", msg.getJobName()));
            }
            // validate msg.jobData
            Map<String, Object> config = new HashMap<>();
            List<String> serviceConfigFolders = new ArrayList<>();
            serviceConfigFolders.add("serviceConfig");
            config.put("serviceConfigFolders", serviceConfigFolders);
            ApiMessageProcessor processor = new ApiMessageProcessorImpl(config);
            APIMessage jobMsg = JSONObjectUtil.toObject(msg.getJobData(), apiClass);
            jobMsg.setSession(msg.getSession());
            jobMsg = processor.process(jobMsg);                     // may throw ApiMessageInterceptionException
            msg.setJobData(JSONObjectUtil.toJsonString(jobMsg));    // msg may be changed during validation
        }
    
        private void validate(APICancelLongJobMsg msg) {
            LongJobState state = Q.New(LongJobVO.class)
                    .select(LongJobVO_.state)
                    .eq(LongJobVO_.uuid, msg.getUuid())
                    .findValue();
    
            if (state == LongJobState.Succeeded) {
                throw new ApiMessageInterceptionException(argerr("cannot cancel longjob that is succeeded"));
            }
            if (state == LongJobState.Canceled) {
                throw new ApiMessageInterceptionException(argerr("cannot cancel longjob that is already canceled"));
            }
            if (state == LongJobState.Failed) {
                throw new ApiMessageInterceptionException(argerr("cannot cancel longjob that is failed"));
            }
        }
    
        private void validate(APIDeleteLongJobMsg msg) {
            LongJobState state = Q.New(LongJobVO.class)
                    .select(LongJobVO_.state)
                    .eq(LongJobVO_.uuid, msg.getUuid())
                    .findValue();
    
            if (state != LongJobState.Succeeded && state != LongJobState.Canceled && state != LongJobState.Failed) {
                throw new ApiMessageInterceptionException(argerr("delete longjob only when it's succeeded, canceled, or failed"));
            }
        }
    
        @Override
        public boolean start() {
            Class<APIMessage> apiClass = null;
            List<Class> longJobClasses = BeanUtils.scanClass("org.zstack", LongJobFor.class);
            for (Class it : longJobClasses) {
                LongJobFor at = (LongJobFor) it.getAnnotation(LongJobFor.class);
                try {
                    apiClass = (Class<APIMessage>) Class.forName(at.value().getName());
                } catch (ClassNotFoundException | ClassCastException e) {
                    //ApiMessage and LongJob are not one by one corresponding ,so we skip it
                    e.printStackTrace();
                    continue;
                }
                logger.debug(String.format("[LongJob] collect api class [%s]", apiClass.getSimpleName()));
                apiMsgOfLongJob.put(at.value().getSimpleName(), apiClass);
            }
            return true;
        }
    
        @Override
        public boolean stop() {
            apiMsgOfLongJob.clear();
            return true;
        }
    }
    

    逻辑很简单,通过LongJob的name找出了对应的APIMsg,并将APIMsg发向了对应Intercepter。

    在查找APIMsg这一步也是采用了Cache的思想,在Start的时候就进行了收集。

    展望

    在前面的定义中,我们提到了LongJob是允许暂停和取消行为的。这在接口中也可以看到类似的期许:

    public interface LongJob {
        void start(LongJobVO job, Completion completion);
        void cancel(LongJobVO job, Completion completion);
    }
    

    那么该如何实现它呢?在这里我们仅仅做一个展望,到时还是以释放出来的代码为准。

    Stop

    首先,在CancellableSharedFlowChain定义一个必须被实现的接口。如stop Condition,返回一个boolean。在每个Flow执行前会判断该boolean是否为true,如果为true。则保存context到db,并停止执行。

    Cancel

    同样,也是在CancellableSharedFlowChain定义一个必须被实现的接口。如cancelCondition,返回一个boolean。在每个Flow执行前会判断该boolean是否为true,如果为true。则停止执行并触发之前的所有rollback。

    Rollback的特殊技巧

    那么可能会有同学问了,在这样的设计下,如果发生了如断电的情况,必然导致无法Rollback。这种情况如果发生在一个数据中心,可以说是灾难也不为过。但是我们可以考虑一下如何实现更具有原子性Rollback。

    浅谈数据库事务的实现

    数据库的事务主要是通过Undo日志来实现。在一条记录更新前(更新到硬盘),一定要把相关的Undo日志写入硬盘;而“提交事务”这种记录,要在记录更新完毕后再写入硬盘。所谓的Undo日志,就是没有操作前的日志。如果同学们听完还是觉得有点迷,可以看这篇文章:

    可以考虑的方案

    在了解了数据库事务的实现后,我们可以大致设计出一种方案,用于保证断电后Rollback的完整性:

    1. 在一个FlowChain执行前,在DB里存入一个类似Start FlowChain的标记
    2. 定义每一个Flow的Number号,如第一个Flow为1。在Flow执行前,记录当前Flow Number到数据库,写Flow1开始执行。Flow执行完之前,写Flow1执行完毕。
    3. Flow执行完了,在DB里存入一个类似Done FlowChian的标记。这里我们把Done的那部分也看做一个Flow。

    那么在任何以步骤出问题的时候,基本都可以完成一个Rollback。我们来看一看:

    还没执行Flow的时候断电

    DB中的记录为Start FlowChain,那么是不需要Rollback的。

    执行一个Flow的时候断电

    DB中的最新记录为Flow1开始执行的话,不需要Rollback。这种分布式场景下如果需要做到强一致性,只能对每行代码做类似Undo日志的记录了。

    但是如果记录为Flow1执行完毕,开始Rollback。

    之后执行几个Flow都是参考这里的一个做法。

    小结

    在本文中,笔者和大家了解了ZStack在2.3引入的新模块——LongJob。并对其的出现的背景、解决的痛点和实现进行了分析,最后展望了一下接下来版本中可能会增强的功能。

    相关文章

      网友评论

        本文标题:ZStack源码剖析之模块鉴赏——LongJob

        本文链接:https://www.haomeiwen.com/subject/rgfotftx.html