1. 目的
本文将对流程在activiti中是怎么运行的,任务是怎样推动的进行讲解。
主要包括以下内容:
- PVM中怎么表示流程、任务、连接线,它和activiti的Model怎样转换的。
-
启动流程实例,怎么从开始节点流转到下一个节点。
-
怎样监听流程事件。
2. 主要的jar包和java类
pvm的实现在包org.activiti.engine.impl.pvm
中,我通过源码跟踪、阅读,分析出PVM有以下重要的类:
class | 功能 |
---|---|
ProcessDefinitionImpl | 流程定义 |
ExecutionEntity | 管理流程的运行 |
ActivityImpl | 流程节点的定义 |
AtomicOperation | 流程运行方法,如: AtomicOperationActivityStart(流程启动), AtomicOperationActivityEnd(流程结束) |
ActivityBehavior | 委托 |
2.1 在哪定义的
2.1.1 ProcessDefinitionImpl
ProcessDefinitionImpl
是PVM对整个流程定义。通过ProcessDefinitionImpl.createProcessInstance
可以得到流程实例管理接口PvmProcessInstance
。ProcessDefinitionImpl
的代码片段如下:
public class ProcessDefinitionImpl extends ScopeImpl implements PvmProcessDefinition {
private static final long serialVersionUID = 1L;
protected String name;
protected String key;
protected String description;
protected ActivityImpl initial;
protected Map<ActivityImpl, List<ActivityImpl>> initialActivityStacks = new HashMap<ActivityImpl, List<ActivityImpl>>();
protected List<LaneSet> laneSets;
protected ParticipantProcess participantProcess;
public ProcessDefinitionImpl(String id) {
super(id, null);
processDefinition = this;
}
public PvmProcessInstance createProcessInstance() {
if(initial == null) {
throw new ActivitiException("Process '"+name+"' has no default start activity (e.g. none start event), hence you cannot use 'startProcessInstanceBy...' but have to start it using one of the modeled start events (e.g. message start events).");
}
return createProcessInstanceForInitial(initial);
}
...
}
2.1.2 ExecutionEntity
ExecutionEntity
实现了接口ActivityExecution
,ActivityExecution
是流程运行管理接口。ExecutionEntity
提供的功能如下:
- 启动、结束、销毁流程。
- 对流程元素
ActivityImpl
的管理(添加删除修改父节点、实例ID、任务等)。
ExecutionEntity
里几个重要的方法:
-
initialize()
里面初始化task、job、variable、event。
-
start()
是流程启动方法。 -
performOperation()
是流程执行方法,流程的推动都调用performOperation()
。
2.1.3 ActivityImpl
ActivityImpl
表示单个流程元素,比如任务、开始节点、连接线、网关等。它的属性包括:variables、activityBehavior、incomingTransitions、outgoingTransitions以及元素界面位置等。ActivityImpl
的类图如下:
-
ActivityImpl
是活动节点,在流程图上对应开始节点、结束节点、各种任务节点。 -
TransitionImpl
是连接线。 -
ProcessDefinitionImpl
是整个的定义,类图中未画。类ProcessElementImpl
里面有ProcessDefinitionImpl
属性。
2.1.4 AtomicOperation
AtomicOperation
是流程执行的接口,比如流程启动,流程结束,任务启动,任务执行,任务结束等。AtomicOperation
中枚举了所有流程执行方法,AtomicOperation
的代码如下:
public interface AtomicOperation {
AtomicOperation PROCESS_START = new AtomicOperationProcessStart();
AtomicOperation PROCESS_START_INITIAL = new AtomicOperationProcessStartInitial();
AtomicOperation PROCESS_END = new AtomicOperationProcessEnd();
AtomicOperation ACTIVITY_START = new AtomicOperationActivityStart();
AtomicOperation ACTIVITY_EXECUTE = new AtomicOperationActivityExecute();
AtomicOperation ACTIVITY_END = new AtomicOperationActivityEnd();
AtomicOperation TRANSITION_NOTIFY_LISTENER_END = new AtomicOperationTransitionNotifyListenerEnd();
AtomicOperation TRANSITION_DESTROY_SCOPE = new AtomicOperationTransitionDestroyScope();
AtomicOperation TRANSITION_NOTIFY_LISTENER_TAKE = new AtomicOperationTransitionNotifyListenerTake();
AtomicOperation TRANSITION_CREATE_SCOPE = new AtomicOperationTransitionCreateScope();
AtomicOperation TRANSITION_NOTIFY_LISTENER_START = new AtomicOperationTransitionNotifyListenerStart();
AtomicOperation DELETE_CASCADE = new AtomicOperationDeleteCascade();
AtomicOperation DELETE_CASCADE_FIRE_ACTIVITY_END = new AtomicOperationDeleteCascadeFireActivityEnd();
void execute(InterpretableExecution execution);
boolean isAsync(InterpretableExecution execution);
}
2.1.5 ActivityBehavior
ActivityBehavior
是委托接口。AtomicOperation
执行过程中会调用ActivityBehavior
。ActivityBehavior
定义了execute
方法,入参类型为ActivityExecution
,ActivityBehavior
的代码如下:
public interface ActivityBehavior extends Serializable {
void execute(ActivityExecution execution) throws Exception;
}
2.2 模块之间调用顺序
下面用RuntimeServiceImpl.startProcessInstanceById()
启动一个流程,说明PVM的调用顺序。流程图如下:
2.2.1 从activiti调用PVM
RuntimeServiceImpl.startProcessInstanceById()
通过命令模式调用了方法StartProcessInstanceCmd.execute()
,StartProcessInstanceCmd
实现了以下功能:
- 从数据库获取
ProcessDefinitionEntity
(ProcessDefinitionEntity
是流程对应的数据库实体,同时继承ProcessDefinitionImpl
)。
-
通过方法
ProcessDefinitionEntity.createProcessInstance
获得ExecutionEntity
。 -
调用
ExecutionEntity.start()
启动流程。
2.2.2 ExecutionEntity调用AtomicOperation
<a id="start"></a>
StartProcessInstanceCmd
调用了ExecutionEntity.start()
就完了,任务怎么生成的。肯定在ExecutionEntity.start()
里。ExecutionEntity.start()
的代码如下:
public void start() {
if(startingExecution == null && isProcessInstanceType()) {
startingExecution = new StartingExecution(processDefinition.getInitial());
}
performOperation(AtomicOperation.PROCESS_START);
}
ExecutionEntity.start()
里面调用了performOperation
,performOperation
又对同步执行和异步执行进行了拆分,这里只看同步执行的代码:
// activiti里面到处都是`CommandContext`
protected void performOperationSync(AtomicOperation executionOperation) {
Context
.getCommandContext()
.performOperation(executionOperation, this);
}
Context.getCommandContext().performOperation
里面调用了executionOperation.execute()
。Context
里对线程之间的数据进行了隔离,但performOperation
对并发调用的判断是多余的。一次部署过程需要调用10多次performOperation
,很有必要看看代码长什么样,Context.getCommandContext().performOperation
的代码如下:
public void performOperation(AtomicOperation executionOperation, InterpretableExecution execution) {
nextOperations.add(executionOperation);
if (nextOperations.size()==1) {
try {
Context.setExecutionContext(execution);
while (!nextOperations.isEmpty()) {
AtomicOperation currentOperation = nextOperations.removeFirst();
if (log.isTraceEnabled()) {
log.trace("AtomicOperation: {} on {}", currentOperation, this);
}
if (execution.getReplacedBy() == null) {
currentOperation.execute(execution);
} else {
currentOperation.execute(execution.getReplacedBy());
}
}
} finally {
Context.removeExecutionContext();
}
}
}
2.2.3 AtomicOperation执行顺序
绕了一大圈,ExecutionEntity.start()
真正调用了AtomicOperation.PROCESS_START
(AtomicOperation.PROCESS_START = new AtomicOperationProcessStart()
)。AtomicOperationProcessStart
实现了以下功能:
- 触发listener、event。
-
设置流程运行节点为开始节点。
-
调用
AtomicOperation.PROCESS_START_INITIAL
AtomicOperationProcessStart
的代码片段如下:
@Override
protected void eventNotificationsCompleted(InterpretableExecution execution) {
if(Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
Map<String, Object> variablesMap = null;
try {
variablesMap = execution.getVariables();
} catch (Throwable t) {
// In some rare cases getting the execution variables can fail (JPA entity load failure for example)
// We ignore the exception here, because it's only meant to include variables in the initialized event.
}
Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createEntityWithVariablesEvent(ActivitiEventType.ENTITY_INITIALIZED,
execution, variablesMap, false));
}
ProcessDefinitionImpl processDefinition = execution.getProcessDefinition();
StartingExecution startingExecution = execution.getStartingExecution();
List<ActivityImpl> initialActivityStack = processDefinition.getInitialActivityStack(startingExecution.getInitial());
execution.setActivity(initialActivityStack.get(0)); // 当前流程执行节点就通过execution.setActivity设置了
execution.performOperation(PROCESS_START_INITIAL); // 当前操作完成之后,进行下个操作就调用execution.performOperation
}
AtomicOperation.PROCESS_START_INITIAL
里面又调用了AtomicOperation.ACTIVITY_EXECUTE
,经过多次调用不同的AtomicOperation
接口,完成流程实例的创建。
对于示例流程图,从开始节点到用户任务的调用AtomicOperation
顺序如下:
PROCESS_START
PROCESS_START_INITIAL
ACTIVITY_EXECUTE
TRANSITION_NOTIFY_LISTENER_END
TRANSITION_DESTROY_SCOPE
TRANSITION_NOTIFY_LISTENER_TAKE
TRANSITION_CREATE_SCOPE
TRANSITION_NOTIFY_LISTENER_START
ACTIVITY_EXECUTE
2.2.4 Task节点多久持久化到数据库的
用户任务对应的ActivityImpl
上绑定了UserTaskActivityBehavior
,UserTaskActivityBehavior
中对任务数据进行了持久化处理。
用户任务执行ACTIVITY_EXECUTE
操作的时候,会触发节点上绑定的Behavior
。
3. 看后感
- 接口定义到处都是,对于后期扩展是有好处的,关键代码要找好久。
-
PVM的
AtomicOperation
很适合用命令模式,为什么要用委托的方式。 -
AtomicOperation
的调用栈太深了。 -
CommandContext
到处都是。
网友评论