六十分钟入门Activiti框架原理

作者: yingzong | 来源:发表于2017-09-13 16:02 被阅读819次

    本文基于一个简单的Demo流程介绍了Activiti框架启动、部署、运行过程。

    Demo准备

    流程图文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <definitions  xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">
       <process id="hello" name="hello" isExecutable="true">
            <!-- 流程开始节点 -->
            <startEvent id="start" name="Start" ></startEvent>
            <!-- serviceTask:执行me.likeyao.activiti.demo.HelloWorld的execute方法,打印hello world -->
            <serviceTask id="helloworld" name="helloworld" activiti:class="me.likeyao.activiti.demo.HelloWorld"/>
            <!-- 流程结束节点 -->
            <endEvent id="end" name="End"></endEvent>
            <!-- 流程迁移线:开始节点到serviceTask节点 -->
            <sequenceFlow id="sid-1" sourceRef="start" targetRef="helloworld"></sequenceFlow>
            <!-- 流程迁移线:serviceTask节点到结束节点 -->
            <sequenceFlow id="sid-3" sourceRef="helloworld" targetRef="end"></sequenceFlow>
        </process>
    </definitions>
    

    流程图:

    demo流程

    代码:

    public class App {
        public static void main(String[] args) {
            //创建流程引擎
            ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
            //部署流程图
            processEngine.getRepositoryService().createDeployment().addClasspathResource("hello.bpmn20.xml").deploy();
            //发起流程
            processEngine.getRuntimeService().startProcessInstanceByKey("hello");
        }
    }
    
    
    public class HelloWorld implements JavaDelegate{
        public void execute(DelegateExecution execution) throws Exception {
            System.out.println("Hello world!");
        }
    }
    
    

    Demo实现的功能是发起一个流程,执行到流程的serviceTask节点时,打印Hello world!,然后流程结束。

    源码版本:5.22.0

    框架初始化

    ProcessEngine类图

    ProcessEngine

    ProcessEngine是Activiti框架的门面,ProcessEngine本身不提供任何功能,通过getXXXService方法可以获取到对应的Service对象执行操作。Demo中涉及到的两个Service:

    • RepositoryService:流程定义和流程部署相关功能。
    • RuntimeService:流程实例相关功能(发起流程、获取流程实例变量)。

    ProcessEngineConfiguration

    ProcessEngineConfiguration负责Activiti框架的属性配置、初始化工作,初始化入口是buildProcessEngine方法,所有Activiti框架运行时需要用到的组件基本都在这里初始化:

    public ProcessEngine buildProcessEngine() {
        init();
        return new ProcessEngineImpl(this);
    }
    
    protected void init() {
        initConfigurators();
        configuratorsBeforeInit();
        initProcessDiagramGenerator();
        initHistoryLevel();
        initExpressionManager();
        initDataSource();
        initVariableTypes();
        initBeans();
        initFormEngines();
        initFormTypes();
        initScriptingEngines();
        initClock();
        initBusinessCalendarManager();
        initCommandContextFactory();
        initTransactionContextFactory();
        initCommandExecutors();
        initServices();
        initIdGenerator();
        initDeployers();
        initJobHandlers();
        initJobExecutor();
        initAsyncExecutor();
        initTransactionFactory();
        initSqlSessionFactory();
        initSessionFactories();
        initJpa();
        initDelegateInterceptor();
        initEventHandlers();
        initFailedJobCommandFactory();
        initEventDispatcher();
        initProcessValidator();
        initDatabaseEventLogging();
        configuratorsAfterInit();
    }
    

    这里有一个扩展点:ProcessEngineConfigurator。

    public interface ProcessEngineConfigurator {
        //组件初始化前
        void beforeInit(ProcessEngineConfigurationImpl processEngineConfiguration);
        //组件初始化后
        void configure(ProcessEngineConfigurationImpl processEngineConfiguration);
        //优先级
        int getPriority();
    }
    

    在init初始化方法中,initConfigurators方法通过ServiceLoader加载ProcessEngineConfigurator。随后在configuratorsBeforeInit和configuratorsAfterInit方法中分别调用ProcessEngineConfigurator的beforeInit和configure方法,使用户可以在ProcessEngineConfiguration初始化前后编程式的修改属性,替换Activiti默认组件。

    流程部署

    流程部署实现的功能是将xml格式的流程图,转化为Activiti框架运行时依赖的流程定义对象。

    RepositoryService

    RepositoryService类图

    Demo中通过以下代码部署了一个流程:

    processEngine.getRepositoryService().createDeployment().addClasspathResource("hello.bpmn20.xml").deploy();
    

    createDeployment方法中创建了DeploymentBuilder对象,DeploymentBuilder对象负责读取指定路径的流程图xml文件的内容(byte数组),并缓存在DeploymentEntity对象中:

    public DeploymentBuilder addInputStream(String resourceName, InputStream inputStream) {
        ...
        byte[] bytes = IoUtil.readInputStream(inputStream, resourceName);
        ResourceEntity resource = new ResourceEntity();
        resource.setName(resourceName);
        resource.setBytes(bytes);
        deployment.addResource(resource);
        return this;
    }
    

    最终DeploymentBuilder的deploy方法会调用RepositoryService的deploy方法,完成流程部署:

    public Deployment deploy() {
        return repositoryService.deploy(this);
    }
    

    CommandExecutor

    在RepositoryService的deploy方法中,使用了CommandExecutor对象:

    public Deployment deploy(DeploymentBuilderImpl deploymentBuilder) {
        return commandExecutor.execute(new DeployCmd<Deployment>(deploymentBuilder));
    }
    
    CommandExecutor类图

    在Activiti中,大部分操作都以Command模式实现,例如部署流程图的DeployCmd。CommandExecutor封装了一系列的CommandInterceptor,在内部形成CommandInterceptor链,在命令执行前后做了拦截。Activiti框架提供了一些
    CommandInterceptor实现:

    名称 作用
    CommandContextInterceptor 用于生成命令执行的上下文(CommandContext)。
    LogInterceptor 开启日志Debug级别后,打印日志。
    JtaTransactionInterceptor 开启Jta事务

    引入activiti-spring包,通过SpringTransactionInterceptor引入Spring的事务支持。

    CommandExecutor在ProcessEngineConfigurationImpl的initCommandExecutors方法中初始化:

    protected void initCommandExecutors() {
        initDefaultCommandConfig();
        initSchemaCommandConfig();
        initCommandInvoker();
        initCommandInterceptors();
        initCommandExecutor();
    }
    

    可以设置ProcessEngineConfigurationImpl的customPreCommandInterceptors和customPostCommandInterceptors属性,添加自定义的CommandInterceptor:

    protected void initCommandInterceptors() {
        if (commandInterceptors==null) {
            commandInterceptors = new ArrayList<CommandInterceptor>();
            if (customPreCommandInterceptors!=null) {
                commandInterceptors.addAll(customPreCommandInterceptors);
            }
            commandInterceptors.addAll(getDefaultCommandInterceptors());
            if (customPostCommandInterceptors!=null) {
                commandInterceptors.addAll(customPostCommandInterceptors);
            }
            commandInterceptors.add(commandInvoker);
        }
    }
    

    这里的pre和post是指Activiti框架getDefaultCommandInterceptors()的前后。

    CommandInvoker是CommandInterceptor链的最后一个对象,负责调用Command:

    public class CommandInvoker extends AbstractCommandInterceptor {
        @Override
        public <T> T execute(CommandConfig config, Command<T> command) {
            return command.execute(Context.getCommandContext());
        }
    }
    

    CommandContext

    CommandContext类图

    CommandContext是Activit框架Command执行的上下文,主要包含各种SessionFactory:

    sessionFactories = processEngineConfiguration.getSessionFactories();
    

    SessionFactory负责生成Session,Session是Activiti操作持久化对象的统一接口:

    名称 作用
    ProcessDefinitionEntityManager 流程定义相关读写操作。
    ExecutionEntityManager 流程实例相关读写操作。
    DefaultHistoryManager 历史记录相关读写操作

    CommandContext的生命周期

    CommandConext在CommandContextInterceptor中创建,在finally代码块中销毁:

    public <T> T execute(CommandConfig config, Command<T> command) {
        //首先尝试从线程上下文的栈中获取CommandContext
        CommandContext context = Context.getCommandContext();
        boolean contextReused = false;
        //什么时候创建新的CommandContext?
        //1、CommandConfig中指定了不复用CommandContext
        //2、当前线程上下文中不存在CommandConext
        //3、当前线程上下文中的CommandConext已经抛出异常
        if (!config.isContextReusePossible() || context == null || context.getException() != null) { 
            context = commandContextFactory.createCommandContext(command);      }  
        else {
            contextReused = true;
        }
    
        try {
            //将前面获取到的CommandContext入栈
            Context.setCommandContext(context);
            Context.setProcessEngineConfiguration(processEngineConfiguration);
            //执行下一个interceptor,在CommandInvoker中可以通过Context.getCommandContext()获取线程上下文中的CommandContext
            return next.execute(config, command);
        } catch (Exception e) {
            //记录异常信息
            context.exception(e);
        } finally {
            try {
                //如果CommandContext不可复用,用完直接关闭
                if (!contextReused) {
                    context.close();
                }
            } finally {
                //出栈操作
                Context.removeCommandContext();
                Context.removeProcessEngineConfiguration();
                Context.removeBpmnOverrideContext();
            }
        }
        
        return null;
    }
    

    Activiti的框架可以在一个Command的执行过程中,调用另外一个Command,所以会出现是否需要复用CommandContext的选项,默认值为true。

    流程的解析

    在DeployCmd中,首先调用DeploymentEntityManager持久化存储DeploymentEntity对象:

    commandContext.getDeploymentEntityManager().insertDeployment(deployment);
    

    然后调用DeploymentManager部署流程(流程解析):

    commandContext.getProcessEngineConfiguration().getDeploymentManager().deploy(deployment, deploymentSettings);
    
    DeploymentEntityManager
    DeploymentEntityManager类图

    DeploymentEntityManager的deploy方法中循环调用Deployer对象的deploy方法,Activiti默认的Deployer是BpmnDeployer。

    另外DeploymentEntityManager中还缓存了解析好的流程定义对象和Bpmn模型对象。

    Activiti持久化的是流程图xml文件,每次系统重新启动都要执行一次“deploy”操作,生成ProcessDefinitionEntity对象。

    BpmnDeployer
    BpmnDeployer类图

    BpmnDeployer的deploy方法中包含几个操作(代码缩略版):

    public void deploy(DeploymentEntity deployment, Map<String, Object> deploymentSettings) {
        ...
        BpmnParse bpmnParse = bpmnParser.createParse().sourceInputStream(inputStream).setSourceSystemId(resourceName).deployment(deployment).name(resourceName);
        bpmnParse.execute();
        for (ProcessDefinitionEntity processDefinition: bpmnParse.getProcessDefinitions()) {
            if (deployment.isNew()) {
                ProcessDefinitionEntity latestProcessDefinition = ...
                if (latestProcessDefinition != null) {
                    processDefinitionVersion = latestProcessDefinition.getVersion() + 1;
                }else{
                    processDefinitionVersion = 1;
                }
                processDefinition.setId(idGenerator.getNextId());
                dbSqlSession.insert(processDefinition);
            }
            ...
        }
    }
    
    • 通过BpmnParser对象创建BpmnParse。
    • 调用BpmnParse的execute方法,将inputStream中的流程图转化为ProcessDefinitionEntity。
    • 持久化ProcessDefinitionEntity对象。
    BpmnParse
    BpmnParse类图

    在BpmnParse的execute中完成了xml文件到ProcessDefinitionEntity对象的转化:

    public BpmnParse execute() {
        //xml->bpmnModel
        bpmnModel = converter.convertToBpmnModel(streamSource, validateSchema, enableSafeBpmnXml, encoding);
        //bpmnModel-> ProcessDefinitionEntity
        transformProcessDefinitions();
    }
    
    protected void transformProcessDefinitions() {
        for (Process process : bpmnModel.getProcesses()) {
            bpmnParserHandlers.parseElement(this, process);
        }
    }
    

    在流程定义解析过程中,会涉及到两套模型:

    • Bpmn模型(由BpmnXMLConverter完成转换)
    • PVM模型(由BpmnParseHandlers完成转换)

    Bpmn模型

    Bpmn模型

    PVM模型

    PVM模型

    Bpmn模型更偏向于xml节点的描述,PVM模型是运行时模型。Bpmn模型中的ServiceTask、StartEvent等会统一映射转换为PVM的ActivityImpl对象,ServiceTask和StartEvent等节点行为上的差别,体现在ActivityImpl对象持有的不同的ActivityBehavior上。

    运行流程

    创建流程实例

    在demo中通过RuntimeService发起流程实例:

    processEngine.getRuntimeService().startProcessInstanceByKey("hello");
    

    在startProcessInstanceByKey方法中执行StartProcessInstanceCmd命令:

    public class StartProcessInstanceCmd<T> implements Command<ProcessInstance>, Serializable {
        ...
        public ProcessInstance execute(CommandContext commandContext) {
            //获取流程定义
            ProcessDefinitionEntity processDefinition = ...
            //创建流程实例
            ExecutionEntity processInstance = processDefinition.createProcessInstance(businessKey);
            //开始流程
            processInstance.start();
            return processInstance;
        }
        ...
    }
    
    

    在StartProcessInstanceCmd方中通过流程定义ProcessDefinitionEntity创建了流程实例ExecutionEntity:

    ExecutionEntity

    ExecutionEntity实现了一些重要接口:

    • PVM相关的接口,赋予了ExecutionEntity流程驱动的能力,例如single、start方法。
    • 实现VariableScope接口让ExecutionEntity可以持久上下文变量。
    • ProcessInstance接口暴露了ExecutionEntity关联的ProcessDefinitionEntity的信息。
    • PersistentObject接口代表ExecutionEntity对象是需要持久化。

    在ExecutionEntity中维护类一个属性:activity。activity属性代表当前执行到哪个节点,在创建ExecutionEntity过程中会设置activity,使流程从某一个节点开始,默认是开始节点。

    最后StartProcessInstanceCmd还调用ExecutionEntity的start方法开始驱动流程:

    public void start() {
        performOperation(AtomicOperation.PROCESS_START);
    }
    

    驱动流程

    Activiti框架的流程运行于PVM模型之上,在流程运行时主要涉及到PVM中几个对象:ActivityImpl、TransitionImpl和ActivityBehavior。

    • ActivityImpl:ActivityImpl是流程节点的抽象,ActivityImpl维护流程图中节点的连线,包括有哪些进线,有哪些出线。另外还包含节点同步/异步执行等信息。
    • TransitionImpl:TransitionImpl包含source和target两个属性,连接了两个流程节点。
    • ActivityBehavior:每一个ActivityImpl对象都拥有一个ActivityBehavior对象,ActivityBehavior代表节点的行为。

    ActivityImpl、TransitionImpl和ActivityBehavior只是描述了流程的节点、迁移线和节点行为,真正要让ExecutionEntity流转起来,还需要AtomicOperation的驱动:

    AtomicOperation PROCESS_START = new AtomicOperationProcessStart();
    AtomicOperation PROCESS_START_INITIAL = new AtomicOperationProcessStartInitial();
    AtomicOperation PROCESS_END = new AtomicOperationProcessEnd();
    AtomicOperation ACTIVITY_START = new AtomicOperationActivityStart();
    AtomicOperation ACTIVITY_EXECUTE = new AtomicOperationActivityExecute();
    AtomicOperation ACTIVITY_END = new AtomicOperationActivityEnd();
    AtomicOperation TRANSITION_NOTIFY_LISTENER_END = new AtomicOperationTransitionNotifyListenerEnd();
    AtomicOperation TRANSITION_DESTROY_SCOPE = new AtomicOperationTransitionDestroyScope();
    AtomicOperation TRANSITION_NOTIFY_LISTENER_TAKE = new AtomicOperationTransitionNotifyListenerTake();
    AtomicOperation TRANSITION_CREATE_SCOPE = new AtomicOperationTransitionCreateScope();
    AtomicOperation TRANSITION_NOTIFY_LISTENER_START = new AtomicOperationTransitionNotifyListenerStart();
        
    AtomicOperation DELETE_CASCADE = new AtomicOperationDeleteCascade();
    AtomicOperation DELETE_CASCADE_FIRE_ACTIVITY_END = new AtomicOperationDeleteCascadeFireActivityEnd();
    

    在ExecutionEntity的start方法中,调用了PROCESS_START,PROCESS_START做了几件事:

    • 获取流程定义级别定义的监听start事件的ExecutionListener,调用notify方法。
    • 如果开启了事件功能,发布ActivitiEntityWithVariablesEvent和ActivitiProcessStartedEvent。
    • 调用PROCESS_START_INITIAL。

    PROCESS_START_INITIAL也实现了类似的功能:

    • 获取初始节点上定义的监听start事件的ExecutionListener,调用notify方法。
    • 调用ACTIVITY_EXECUTE。

    在Demo流程执行中涉及的AtomicOperation的链路主要包括:

    • ACTIVITY_EXECUTE:调用当前activity的behavior。
    • TRANSITION_NOTIFY_LISTENER_END:某个activity节点执行完毕,调用节点上声明的监听end事件的ExecutionListener。
    • TRANSITION_NOTIFY_LISTENER_TAKE:触发线上的ExecutionListener。
    • TRANSITION_NOTIFY_LISTENER_START:某个activity节点即将开始执行,调用节点上的监听start事件的ExecutionListener。

    以Demo流程中的ServiceTask节点helloworld为例,在执行ACTIVITY_EXECUTE时,会获取activity关联的behavior:

    public class AtomicOperationActivityExecute implements AtomicOperation {
        public void execute(InterpretableExecution execution) {
            ...
            ActivityImpl activity = (ActivityImpl) execution.getActivity();
            ActivityBehavior activityBehavior = activity.getActivityBehavior();
            activityBehavior.execute(execution);
            ...
        }
    }
    

    ServiceTask解析时关联的是ServiceTaskJavaDelegateActivityBehavior,execution方法:

    public void execute(ActivityExecution execution) throws Exception {
        //execution中调用了me.likeyao.activiti.demo.HelloWorld
        execute((DelegateExecution) execution);
        //离开当前节点
        leave(execution);
    }
    

    在leave方法中调用了:

    bpmnActivityBehavior.performDefaultOutgoingBehavior(execution);
    

    performDefaultOutgoingBehavior方法会在当前activity的
    出线中选择一条,使流程流向下一个节点。在Demo中只有一条线存在:

    protected void performOutgoingBehavior(ActivityExecution execution, 
              boolean checkConditions, boolean throwExceptionIfExecutionStuck, List<ActivityExecution> reusableExecutions) {
    
        if (transitionsToTake.size() == 1) {
            execution.take(transitionsToTake.get(0));
        }         
              
    }
    

    最终take方法会将流程驱动权交还到AtomicOperation中:

    public class ExecutionEntity{
        ...
        public void take(PvmTransition transition, boolean fireActivityCompletionEvent) {
            ...
            setActivity((ActivityImpl)transition.getSource());
            setTransition((TransitionImpl) transition);
            performOperation(AtomicOperation.TRANSITION_NOTIFY_LISTENER_END);
            ...
        }
        ...
    }
    

    AtomicOperation的问题

    按照AtomicOperation的驱动模式,只有当遇到UserTask等需要等待single信号的节点,调用才会返回。这意味着当调用RuntimeService启动一个流程实例时,要一直等到流程运行到一个UserTask节点调用才会返回,如果流程比较长耗时非常验证。

    另一个问题是当流程图比较复杂,ExecutionListener数量比较多时,AtomicOperation之间的互相调用会导致调用栈非常深。

    AtomicOperation驱动模式与ExecutionEntity、Behavior等绑定的比较紧密,暂时没有特别好的办法替换掉。

    小结

    本文主要介绍了Activiti框架的启动、部署、运行的主链路,并没有深入BPMN规范和Activit功能的具体实现,后续打算根据Activiti的用户手册,详细分析每个功能的使用和实现。

    相关文章

      网友评论

        本文标题:六十分钟入门Activiti框架原理

        本文链接:https://www.haomeiwen.com/subject/khflsxtx.html