美文网首页
activiti流程运行过程代码走读

activiti流程运行过程代码走读

作者: 西普士 | 来源:发表于2016-08-12 15:14 被阅读2620次

    1. 目的

    本文将对流程在activiti中是怎么运行的,任务是怎样推动的进行讲解。

    主要包括以下内容:

    1. PVM中怎么表示流程、任务、连接线,它和activiti的Model怎样转换的。
    • 启动流程实例,怎么从开始节点流转到下一个节点。

    • 怎样监听流程事件。

    2. 主要的jar包和java类

    pvm的实现在包org.activiti.engine.impl.pvm中,我通过源码跟踪、阅读,分析出PVM有以下重要的类:

    class 功能
    ProcessDefinitionImpl 流程定义
    ExecutionEntity 管理流程的运行
    ActivityImpl 流程节点的定义
    AtomicOperation 流程运行方法,如:
     AtomicOperationActivityStart(流程启动),
     AtomicOperationActivityEnd(流程结束)
    ActivityBehavior 委托

    2.1 在哪定义的

    2.1.1 ProcessDefinitionImpl

    ProcessDefinitionImpl是PVM对整个流程定义。通过ProcessDefinitionImpl.createProcessInstance可以得到流程实例管理接口PvmProcessInstanceProcessDefinitionImpl的代码片段如下:

      public class ProcessDefinitionImpl extends ScopeImpl implements PvmProcessDefinition {
    
        private static final long serialVersionUID = 1L;
    
        protected String name;
        protected String key;
        protected String description;
        protected ActivityImpl initial;
        protected Map<ActivityImpl, List<ActivityImpl>> initialActivityStacks = new HashMap<ActivityImpl, List<ActivityImpl>>();
        protected List<LaneSet> laneSets;
        protected ParticipantProcess participantProcess;
    
        public ProcessDefinitionImpl(String id) {
          super(id, null);
          processDefinition = this;
        }
    
        public PvmProcessInstance createProcessInstance() {
          if(initial == null) {
            throw new ActivitiException("Process '"+name+"' has no default start activity (e.g. none start event), hence you cannot use 'startProcessInstanceBy...' but have to start it using one of the modeled start events (e.g. message start events).");
          }
          return createProcessInstanceForInitial(initial);
        }
    
        ...  
      }
    

    2.1.2 ExecutionEntity

    ExecutionEntity实现了接口ActivityExecutionActivityExecution是流程运行管理接口。ExecutionEntity提供的功能如下:

    1. 启动、结束、销毁流程。
    • 对流程元素ActivityImpl的管理(添加删除修改父节点、实例ID、任务等)。

    ExecutionEntity里几个重要的方法:

    1. initialize()里面初始化task、job、variable、event。
    • start()是流程启动方法。

    • performOperation()是流程执行方法,流程的推动都调用performOperation()

    2.1.3 ActivityImpl

    ActivityImpl表示单个流程元素,比如任务、开始节点、连接线、网关等。它的属性包括:variables、activityBehavior、incomingTransitions、outgoingTransitions以及元素界面位置等。ActivityImpl的类图如下:

    类图
    • ActivityImpl是活动节点,在流程图上对应开始节点、结束节点、各种任务节点。

    • TransitionImpl是连接线。

    • ProcessDefinitionImpl是整个的定义,类图中未画。类ProcessElementImpl里面有ProcessDefinitionImpl属性。

    2.1.4 AtomicOperation

    AtomicOperation是流程执行的接口,比如流程启动,流程结束,任务启动,任务执行,任务结束等。AtomicOperation中枚举了所有流程执行方法,AtomicOperation的代码如下:

    
      public interface AtomicOperation {
    
        AtomicOperation PROCESS_START = new AtomicOperationProcessStart();
        AtomicOperation PROCESS_START_INITIAL = new AtomicOperationProcessStartInitial();
        AtomicOperation PROCESS_END = new AtomicOperationProcessEnd();
        AtomicOperation ACTIVITY_START = new AtomicOperationActivityStart();
        AtomicOperation ACTIVITY_EXECUTE = new AtomicOperationActivityExecute();
        AtomicOperation ACTIVITY_END = new AtomicOperationActivityEnd();
        AtomicOperation TRANSITION_NOTIFY_LISTENER_END = new AtomicOperationTransitionNotifyListenerEnd();
        AtomicOperation TRANSITION_DESTROY_SCOPE = new AtomicOperationTransitionDestroyScope();
        AtomicOperation TRANSITION_NOTIFY_LISTENER_TAKE = new AtomicOperationTransitionNotifyListenerTake();
        AtomicOperation TRANSITION_CREATE_SCOPE = new AtomicOperationTransitionCreateScope();
        AtomicOperation TRANSITION_NOTIFY_LISTENER_START = new AtomicOperationTransitionNotifyListenerStart();
    
        AtomicOperation DELETE_CASCADE = new AtomicOperationDeleteCascade();
        AtomicOperation DELETE_CASCADE_FIRE_ACTIVITY_END = new AtomicOperationDeleteCascadeFireActivityEnd();
    
        void execute(InterpretableExecution execution);
    
        boolean isAsync(InterpretableExecution execution);
      }
    
    

    2.1.5 ActivityBehavior

    ActivityBehavior是委托接口。AtomicOperation执行过程中会调用ActivityBehaviorActivityBehavior定义了execute方法,入参类型为ActivityExecutionActivityBehavior的代码如下:

    public interface ActivityBehavior extends Serializable {
      void execute(ActivityExecution execution) throws Exception;
    }
    

    2.2 模块之间调用顺序

    下面用RuntimeServiceImpl.startProcessInstanceById()启动一个流程,说明PVM的调用顺序。流程图如下:

    示例

    2.2.1 从activiti调用PVM

    RuntimeServiceImpl.startProcessInstanceById()通过命令模式调用了方法StartProcessInstanceCmd.execute()StartProcessInstanceCmd实现了以下功能:

    1. 从数据库获取ProcessDefinitionEntityProcessDefinitionEntity是流程对应的数据库实体,同时继承ProcessDefinitionImpl)。
    • 通过方法ProcessDefinitionEntity.createProcessInstance获得ExecutionEntity

    • 调用ExecutionEntity.start()启动流程。

    2.2.2 ExecutionEntity调用AtomicOperation

    <a id="start"></a>
    StartProcessInstanceCmd调用了ExecutionEntity.start()就完了,任务怎么生成的。肯定在ExecutionEntity.start()里。ExecutionEntity.start()的代码如下:

      public void start() {
        if(startingExecution == null && isProcessInstanceType()) {
          startingExecution = new StartingExecution(processDefinition.getInitial());
        }
        performOperation(AtomicOperation.PROCESS_START);
      }
    

    ExecutionEntity.start()里面调用了performOperationperformOperation又对同步执行和异步执行进行了拆分,这里只看同步执行的代码:

    //  activiti里面到处都是`CommandContext`
    protected void performOperationSync(AtomicOperation executionOperation) {
      Context
        .getCommandContext()
        .performOperation(executionOperation, this);
    }
    

    Context.getCommandContext().performOperation里面调用了executionOperation.execute()Context里对线程之间的数据进行了隔离,但performOperation对并发调用的判断是多余的。一次部署过程需要调用10多次performOperation,很有必要看看代码长什么样,Context.getCommandContext().performOperation的代码如下:

      public void performOperation(AtomicOperation executionOperation, InterpretableExecution execution) {
        nextOperations.add(executionOperation);
        if (nextOperations.size()==1) {
          try {
            Context.setExecutionContext(execution);
            while (!nextOperations.isEmpty()) {
              AtomicOperation currentOperation = nextOperations.removeFirst();
              if (log.isTraceEnabled()) {
                log.trace("AtomicOperation: {} on {}", currentOperation, this);
              }
              if (execution.getReplacedBy() == null) {
                currentOperation.execute(execution);
              } else {
                currentOperation.execute(execution.getReplacedBy());
              }
            }
          } finally {
            Context.removeExecutionContext();
          }
        }
      }
    

    2.2.3 AtomicOperation执行顺序

    绕了一大圈,ExecutionEntity.start()真正调用了AtomicOperation.PROCESS_START(AtomicOperation.PROCESS_START = new AtomicOperationProcessStart())。AtomicOperationProcessStart实现了以下功能:

    1. 触发listener、event。
    • 设置流程运行节点为开始节点。

    • 调用AtomicOperation.PROCESS_START_INITIAL

    AtomicOperationProcessStart的代码片段如下:

    @Override
      protected void eventNotificationsCompleted(InterpretableExecution execution) {
        if(Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
          Map<String, Object> variablesMap = null;
          try {
            variablesMap = execution.getVariables();
          } catch (Throwable t) {
            // In some rare cases getting the execution variables can fail (JPA entity load failure for example)
            // We ignore the exception here, because it's only meant to include variables in the initialized event.
          }
          Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
              ActivitiEventBuilder.createEntityWithVariablesEvent(ActivitiEventType.ENTITY_INITIALIZED,
                  execution, variablesMap, false));
        }
    
        ProcessDefinitionImpl processDefinition = execution.getProcessDefinition();
        StartingExecution startingExecution = execution.getStartingExecution();
        List<ActivityImpl> initialActivityStack = processDefinition.getInitialActivityStack(startingExecution.getInitial());  
        execution.setActivity(initialActivityStack.get(0));   // 当前流程执行节点就通过execution.setActivity设置了    
        execution.performOperation(PROCESS_START_INITIAL);    //  当前操作完成之后,进行下个操作就调用execution.performOperation
      }
    

    AtomicOperation.PROCESS_START_INITIAL里面又调用了AtomicOperation.ACTIVITY_EXECUTE,经过多次调用不同的AtomicOperation接口,完成流程实例的创建。

    对于示例流程图,从开始节点到用户任务的调用AtomicOperation顺序如下:

    PROCESS_START  
    PROCESS_START_INITIAL  
    ACTIVITY_EXECUTE  
    TRANSITION_NOTIFY_LISTENER_END  
    TRANSITION_DESTROY_SCOPE    
    TRANSITION_NOTIFY_LISTENER_TAKE    
    TRANSITION_CREATE_SCOPE    
    TRANSITION_NOTIFY_LISTENER_START   
    ACTIVITY_EXECUTE
    

    2.2.4 Task节点多久持久化到数据库的

    用户任务对应的ActivityImpl上绑定了UserTaskActivityBehaviorUserTaskActivityBehavior中对任务数据进行了持久化处理。

    用户任务执行ACTIVITY_EXECUTE操作的时候,会触发节点上绑定的Behavior

    3. 看后感

    1. 接口定义到处都是,对于后期扩展是有好处的,关键代码要找好久。
    • PVM的AtomicOperation很适合用命令模式,为什么要用委托的方式。

    • AtomicOperation的调用栈太深了。

    • CommandContext到处都是。

    相关文章

      网友评论

          本文标题:activiti流程运行过程代码走读

          本文链接:https://www.haomeiwen.com/subject/nwqesttx.html