美文网首页
Activiti工作流引擎的使用、思考与总结

Activiti工作流引擎的使用、思考与总结

作者: 梦想实现家_Z | 来源:发表于2021-06-16 01:17 被阅读0次

    从业务角度来说,至少需要满足以下功能:

    1.查询待办事项列表

    2.待办事项的办理

    3.查看已办历史

    从技术角度来说:

    1.activiti引擎提供的api功能过于分散,对于开发人员来说,没有一个统一的接口。最终导致开发效率不高,代码维护困难。

    2.画流程图不确定性比较多,节点名称和监听器设置没有统一规范。同样也会导致效率问题。

    用请假的流程来举例,首先,看一下流程图怎么画?

    针对于画图,我们先定一个小规范,要求所有的节点上的Listeners全部指定为一个我们的自定义listener,无论是ExecutionListener还是TaskListener,全部指定为一个固定的自定义listener。

    // 全部统一设置成ActivitiListener监听器
    public class ActivitiListener implements TaskListener, ExecutionListener {
    
        private static final long serialVersionUID = 6071133014325140738L;
            // 用户任务监听器被触发的话,就会调用下面的notify方法
        @Override
        public void notify(DelegateTask delegateTask) {
            // 获取当前任务的taskDefinitionKey,其实就是节点id
            String taskDefinitionKey = delegateTask.getTaskDefinitionKey();
            // 通过taskDefinitionKey和目标类型去IOC容器中查找指定的Bean
            UserTaskListener userTaskHandler =
                    ApplicationContextUtil.getBean(taskDefinitionKey, UserTaskListener.class);
            if (Objects.nonNull(userTaskHandler)) {
                // 执行目标Bean中的方法
                userTaskHandler.notify(delegateTask);
            }
        }
            // ExecutionListener回调方法
        @Override
        public void notify(DelegateExecution execution) {
            // 获取IOC容器
            ApplicationContext webApplicationContext = ApplicationContextUtil.getApplicationContext();
            // 从容器中查找目标类型Bean列表
            Map<String, AbstractElement> beans =
                    webApplicationContext.getBeansOfType(AbstractElement.class);
            if (CollectionUtils.isEmpty(beans)) {
                return;
            }
            // 通过activityId查找符合要求的Bean实例。在bpmn中,其实就是元素的id。
            String activityId = execution.getCurrentActivityId();
            AbstractElement abstractElement = beans.get(activityId);
            if (Objects.nonNull(abstractElement)) {
                // 执行目标Bean的notify方法
                abstractElement.notify(execution);
                return;
            }
            // 如果通过activityId找不到,就通过processDefinitionKey查找目标Bean
            String processDefinitionKey =
                    CastUtil.castString(execution.getVariable(Const.PROCESS_DEFINITION_KEY));
            abstractElement = beans.get(processDefinitionKey);
            if (Objects.nonNull(abstractElement)) {
                // 执行目标Bean的notify方法
                abstractElement.notify(execution);
            }
        }
    }
    

    按照上述设定,那么一旦触发ExecutionListener就会去IOC容器中找AbstractElement类,我们来看一下这个类:

    // 这个接口是需要所有UserTask来实现的
    public interface UserTaskListener {
    
        /** @param delegateTask */
        void notify(DelegateTask delegateTask);
    }
    // 把所有的节点都抽象成AbstractElement,主要在于需要让每一个节点都是ExecutionListener的子类
    // 这个设计的灵感来自于bpmn流程图里面任何组件元素都可以指定ExecutionListener,那意味着我如果想要统一规范所有的组件元素的Listener为ActivitiListener的话,那么我就需要把所有的组件都抽象成一个目标,这样我在ActivitiListener的实现里面就可以通过一定的规则找到目标组件的实现了。
    // 最终,通过总结,要求所有元素的id都作为Bean的名字,也就是说,通过这个元素的id可以在IOC容器中找到对应的Bean实例
    public abstract class AbstractElement extends AbstractActivitiServiceImpl
            implements ExecutionListener {
    
        private static final long serialVersionUID = 4836235578847862052L;
    
        @Autowired private ActivitiBusinessDataRepository activitiBusinessDataRepository;
    
        @Transactional(rollbackFor = Exception.class)
        @Override
        public void notify(DelegateExecution execution) {
            // 如果当前对象不支持,就直接结束
            if (!support(execution)) {
                return;
            }
            Map<String, Object> variables = Maps.newHashMap();
            Map<String, Object> data = Maps.newHashMap();
            // 执行前
            beforeExecute(execution, variables, data);
            // 真正的执行目标代码
            execute(execution, variables, data);
            // 执行后
            afterExecute(execution, data);
        }
    
        /**
         * 执行业务
         *
         * @param execution
         * @param variables
         * @param data
         */
        protected abstract void execute(
                DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data);
    
        /**
         * 后置处理
         *
         * @param execution
         * @param data
         */
        protected abstract void afterExecute(DelegateExecution execution, Map<String, Object> data);
    
        /**
         * @param execution
         * @return
         */
        protected boolean support(DelegateExecution execution) {
            return false;
        }
    
        /**
         * 读取前置系统参数
         *
         * @param execution
         * @param variables
         * @param data
         */
        protected void beforeExecute(
                DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) {
            String taskDefinitionKey = execution.getCurrentActivityId();
            String processDefinitionKey =
                    CastUtil.castString(execution.getVariable(Const.PROCESS_DEFINITION_KEY));
            // 从数据库里面把自定义配置读取到variables里面。如果没有这种需要的话,可以去掉。
            activitiBusinessDataRepository.readProcessInstanceVariable(
                    processDefinitionKey, taskDefinitionKey, variables);
            if (!CollectionUtils.isEmpty(variables)) {
                for (Map.Entry<String, Object> e : variables.entrySet()) {
                    String name = e.getKey();
                    Object value = e.getValue();
                    execution.setVariable(e.getKey(), e.getValue());
                    data.put(name, value);
                }
            }
        }
    
        protected void saveData(DelegateExecution execution, Map<String, Object> data) {
            Timestamp now = new Timestamp(System.currentTimeMillis());
            String id = execution.getId();
            String taskDefinitionKey = execution.getCurrentActivityId();
            String name = execution.getCurrentFlowElement().getName();
            String processInstanceId = execution.getProcessInstanceId();
            String businessKey = execution.getProcessInstanceBusinessKey();
            TaskNodeInfo taskNodeInfo = TaskNodeInfo.newInstance(id, name, taskDefinitionKey);
    
            // 自动保存的流程节点数据;把一些我认为一定要保存的数据自动保存下来,也就是说,不依赖于开发人员,我自己认为一定要保存的数据,比如businessKey一定要保存
            List<FieldInfo> autoSaveTaskNodeVariables =
                    activitiBusinessDataRepository.getAutoSaveTaskNodeVariables(
                            Executor.defaultExecutor(), businessKey, data);
            // processInstanceData是抽象方法,需要每一个子类实现;返回的数据是当前流程实例需要保存的信息,以便后续节点可以使用
            List<FieldInfo> processInstanceDataList = processInstanceData(execution, data);
            // taskNodeData是抽象方法,子类需要实现;返回的数据是当前节点的快照信息
            List<FieldInfo> taskNodeDataList = taskNodeData(execution, data);
    
            // 记录流程实例数据;保存当前流程实例数据
            activitiBusinessDataRepository.saveProcessInstanceData(
                    processInstanceDataList, null, processInstanceId, businessKey, now);
            // 记录节点数据;把当前节点的相关信息保存起来
            activitiBusinessDataRepository.saveTaskNodeData(
                    autoSaveTaskNodeVariables, taskNodeDataList, processInstanceId, taskNodeInfo, now);
            // 自动记录节点历史,不需要开发者操心
            // 一般历史数据大概可以归纳为时间、节点、任务、流程实例ID,附加信息等等,所以花点心思总结一下,应该是可以固定下来的,所以就不需要子类去实现
            activitiBusinessDataRepository.saveHistory(
                    businessKey,
                    processInstanceId,
                    Executor.defaultExecutor(),
                    taskNodeInfo,
                    data,
                    now);
        }
        // 返回的数据是当前流程实例需要保存的信息
        protected abstract List<FieldInfo> processInstanceData(
                DelegateExecution execution, Map<String, Object> data);
      
        // 返回的数据是当前节点的快照信息
        protected abstract List<FieldInfo> taskNodeData(
                DelegateExecution execution, Map<String, Object> data);
    
        /**
         * 读取变量参数
         *
         * @param processDefinitionKey
         * @param taskDefinitionKey
         * @param variables
         */
        protected void readTransactorAndSendNoticeVariable(
                String processDefinitionKey, String taskDefinitionKey, Map<String, Object> variables) {
            activitiBusinessDataRepository.readTransactorAndSendNoticeVariable(
                    processDefinitionKey, taskDefinitionKey, variables);
        }
    }
    

    综上所述,我们总结两点规范:

    • 1.所有的Listener都是ActivitiListener

    • 2.所有的组件id都作为Bean的名字,通过组件id去IOC容器中查找目标Bean;如果找不到,就通过processDefinitionKey查找目标Bean

    关于AbstractActivitiServiceImpl这个类,其实就是集成了activiti里面一些常用的api,我认为这是大多数组件都会调用的,所以特地放在这个抽象类里面,以便所有AbstractElement的子类都可以使用里面的方法。只和activiti引擎相关数据表打交道的逻辑可以放在这个类里面实现。

    public abstract class AbstractActivitiServiceImpl
            implements IActivitiService, ApplicationContextAware {
    
        @Autowired protected TaskService taskService;
    
        @Autowired protected FormService formService;
    
        @Autowired protected HistoryService historyService;
    
        @Autowired protected IdentityService identityService;
    
        @Autowired protected ManagementService managementService;
    
        @Autowired protected RepositoryService repositoryService;
    
        @Autowired protected RuntimeService runtimeService;
    
        @Autowired private IActRunTaskRepository actRunTaskRepository;
    
        private static ApplicationContext APPLICATION_CONTEXT;
    
        protected ApplicationContext applicationContext() {
            return APPLICATION_CONTEXT;
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            APPLICATION_CONTEXT = applicationContext;
        }
    
        /**
         * 统计指定用户的待办事项数量
         *
         * @param userIds
         * @return
         */
        public List<ActivitiCountGroupByResult> countUnassignedTaskGroupByUserId(List<String> userIds) {
            List<Map<String, Object>> list = actRunTaskRepository.countRunTaskForEachOne(userIds);
            if (CollectionUtils.isEmpty(list)) {
                return null;
            }
            Map<String, Integer> userTaskMap = Maps.newHashMap();
            for (Map<String, Object> e : list) {
                String userId = CastUtil.castString(e.get("userId"));
                Integer taskNum = CastUtil.castInt(e.get("taskNum"));
                userTaskMap.put(userId, taskNum);
            }
            List<ActivitiCountGroupByResult> result = Lists.newArrayList();
            for (String userId : userIds) {
                Integer taskNum = userTaskMap.getOrDefault(userId, 0);
                result.add(new ActivitiCountGroupByResult(userId, taskNum));
            }
            return result;
        }
    
        /**
         * 待办事项最少的人
         *
         * @param userIds
         * @return
         */
        public ActivitiCountGroupByResult minUnassignedTask(List<String> userIds) {
            return countUnassignedTask(userIds, false);
        }
        /**
         * 待办事项最多的人
         *
         * @param userIds
         * @return
         */
        public ActivitiCountGroupByResult maxUnassignedTask(List<String> userIds) {
            return countUnassignedTask(userIds, true);
        }
    
        /**
         * 待办事项最少或最多的人
         *
         * @param userIds
         * @return
         */
        private ActivitiCountGroupByResult countUnassignedTask(List<String> userIds, boolean isMax) {
            List<ActivitiCountGroupByResult> list = countUnassignedTaskGroupByUserId(userIds);
            if (CollectionUtils.isEmpty(list)) {
                // 都没有待办事项
                return null;
            }
            if (isMax) {
                return Collections.max(
                        list, Comparator.comparingInt(ActivitiCountGroupByResult::getNum));
            } else {
                return Collections.min(
                        list, Comparator.comparingInt(ActivitiCountGroupByResult::getNum));
            }
        }
    
        @Override
        public Task queryUnassignedTask(String taskId) {
            return taskService.createTaskQuery().taskId(taskId).singleResult();
        }
    
        /**
         * 通过taskId查询task
         *
         * @param taskId
         * @return
         */
        @Override
        public Task queryUnassignedTask(String taskId, String userId) {
            return taskService
                    .createTaskQuery()
                    .taskId(taskId)
                    .taskCandidateOrAssigned(userId)
                    .singleResult();
        }
    
    
        /**
         * 查询待办任务
         *
         * @param userId
         * @return
         */
        @Override
        public List<Task> queryUnassignedTaskByUser(
                String userId, List<String> processInstaceIds, final int page, int size) {
            int size_val = resetSize(size);
            int firstResult = countFirstResult(page, size_val);
            TaskQuery taskQuery = taskService.createTaskQuery().taskCandidateOrAssigned(userId);
            if (!CollectionUtils.isEmpty(processInstaceIds)) {
                taskQuery = taskQuery.processInstanceIdIn(processInstaceIds);
            }
            return taskQuery.orderByTaskCreateTime().desc().listPage(firstResult, size_val);
        }
    
        /**
         * 计算第一条结果的索引
         *
         * @param page
         * @param size
         * @return
         */
        private int countFirstResult(final int page, final int size) {
            int startIndex = 1;
            // 如果减1后还小于等于0,就重置为0(因为达成统一规定:分页的第一页page指定从1开始,但是jpa中是从0开始)
            startIndex = page - startIndex;
            int page_val = startIndex <= 0 ? 0 : startIndex;
            return page_val * size;
        }
    
        /**
         * 计算每页大小
         *
         * @param size
         * @return
         */
        private int resetSize(final int size) {
            return size <= 0 ? 10 : size;
        }
    
        /**
         * 查询已办任务
         *
         * @param userId
         * @param page
         * @param size
         * @return
         */
        @Override
        public List<HistoricTaskInstance> queryDoneTasks(String userId, int page, int size) {
            int size_val = resetSize(size);
            int firstResult = countFirstResult(page, size_val);
            return historyService
                    .createHistoricTaskInstanceQuery()
                    .taskAssignee(userId)
                    .finished()
                    .orderByTaskCreateTime()
                    .desc()
                    .listPage(firstResult, size_val);
        }
    
        /**
         * 统计待办事项数量
         *
         * @param userId
         * @return
         */
        @Override
        public long countUnassignedTask(String userId) {
            return taskService.createTaskQuery().taskCandidateOrAssigned(userId).count();
        }
    
        /**
         * 统计已办事项数量
         *
         * @param userId
         * @return
         */
        public long countDoneTask(String userId) {
            return historyService.createHistoricTaskInstanceQuery().taskAssignee(userId).count();
        }
    
        /**
         * 查询当前流程没有办理人的任务
         *
         * @param processInstanceId
         * @return
         */
        @Override
        public List<Task> currentUnassignedTasks(String processInstanceId) {
            return taskService
                    .createTaskQuery()
                    .processInstanceId(processInstanceId)
                    .taskUnassigned()
                    .list();
        }
    
        /**
         * 查询当前没有被办理的任务
         *
         * @param processInstanceId
         * @return
         */
        @Override
        public Task currentUndoneTask(String processInstanceId) {
            return taskService
                    .createTaskQuery()
                    .processInstanceId(processInstanceId)
                    .singleResult();
        }
    }
    

    从下面开始,我们就要开始做具体的区分了,开始区分流程实例任务处理器

    • 流程实例

      // 考虑到每一个流程都有一个开始节点,那么我就抓住它们的共同点,做了一个抽象类,供所有的实例类来继承
      public abstract class BaseProcessServiceImpl extends AbstractElement {
      
          private ProcessDefinition processDefinition;
      
          private BpmnModel bpmnModel;
      
          private StartEvent startEvent;
      
          private Collection<FlowElement> xmlNodeElements;
      
          private static final String START_EVENT_KEY = "0";
            // 通过IOC容器获取所有的用户任务处理器对象;后续会提到UserTaskHandler
          @Autowired private Map<String, UserTaskHandler> userTaskHandlers;
            // 持久化工具;专门用来存储数据
          @Autowired private ActivitiBusinessDataRepository activitiBusinessDataRepository;
            // 初始化工作;这些解析出来的属性都是为了方便以后扩展使用。
          @PostConstruct
          private void init() {
                // 把流程定义解析出来
              processDefinition =
                      repositoryService
                              .createProcessDefinitionQuery()
                              .processDefinitionKey(processDefinitionKey())
                              .latestVersion()
                              .singleResult();
                // 解析出bpmn模型
              bpmnModel = repositoryService.getBpmnModel(processDefinition.getId());
              Process process = bpmnModel.getProcessById(processDefinitionKey());
                // 解析出所有的xml节点元素
              this.xmlNodeElements = process.getFlowElements();
                // 解析出启动节点
              this.startEvent = (StartEvent) process.getInitialFlowElement();
          }
      
          /**
           * 启动流程实例
           *
           * @param businessKey
           * @param executor 执行人
           * @param variables 需要放进流程实例中的变量值
           * @param data 用来贯穿整个方法,可以把数据放在里面,最后在调用的方法中取出来使用
           * @return
           * @throws GlobalException
           */
          @Transactional(rollbackFor = Exception.class)
          public ProcessInstance startProcessInstance(
                  String businessKey,
                  Executor executor,
                  Map<String, Object> variables,
                  Map<String, Object> data)
                  throws GlobalException {
              Preconditions.checkState(StringUtils.isNotBlank(businessKey), "businessKey不能为空");
                // 特地创建一个map对象,用作启动流程实例的参数;注意要与data区分开,data是不会与流程实例打交道的,但是data的作用是用来贯穿整个方法的,专门用于数据传递;也就是说,data里面的数据不会弄脏流程实例。
              Map<String, Object> allVariables = Maps.newHashMap();
              if (!CollectionUtils.isEmpty(variables)) {
                  allVariables.putAll(variables);
              }
                // processDefinitionKey是一个抽象方法,需要子类实现,不同的流程定义不一样
              String processDefinitionKey = processDefinitionKey();
              allVariables.put(Const.PROCESS_DEFINITION_KEY, processDefinitionKey);
              // 前置操作
              beforeStartProcessInstance(processDefinitionKey, businessKey, executor, allVariables, data);
                // 启动流程实例
              ProcessInstance processInstance;
              if (CollectionUtils.isEmpty(allVariables)) {
                  processInstance =
                          runtimeService.startProcessInstanceByKey(processDefinitionKey, businessKey);
              } else {
                  processInstance =
                          runtimeService.startProcessInstanceByKey(
                                  processDefinitionKey, businessKey, allVariables);
              }
                // 后置操作
              afterStartProcessInstance(processInstance, businessKey, executor, variables, data);
              return processInstance;
          }
      
            // 前置操作
          protected void beforeStartProcessInstance(
                  String processDefinitionKey,
                  String businessKey,
                  Executor executor,
                  Map<String, Object> variables,
                  Map<String, Object> data)
                  throws GlobalException {
                // 通过startEventKey获取启动节点
              UserTaskHandler userTaskHandler = userTaskHandlers.get(startEventKey());
              if (Objects.nonNull(userTaskHandler)) {
                    // 设置启动节点的执行人
                  identityService.setAuthenticatedUserId(executor.getId());
                    // 同样的引入任务处理前置操作
                  userTaskHandler.beforeTask(null, businessKey, executor, variables, data);
              }
                // 读取一些自定义配置,把配置信息存储进variables;这些配置将被用在启动流程实例,这样就可以在这个流程实例存活的任意时刻和节点获取相关配置,方便后续扩展;比如后续可以扩展动态指派任务、待办任务消息提示等业务功能
              activitiBusinessDataRepository.readProcessInstanceVariable(
                      processDefinitionKey(), startEventKey(), variables);
          }
      
            // 后置操作
          protected void afterStartProcessInstance(
                  ProcessInstance processInstance,
                  String businessKey,
                  Executor executor,
                  Map<String, Object> variables,
                  Map<String, Object> data) {
              Timestamp now = new Timestamp(System.currentTimeMillis());
              String processInstanceId = processInstance.getId();
              String processDefinitionKey = processInstance.getProcessDefinitionKey();
              String processDefinitionName = processInstance.getProcessDefinitionName();
              String startUserId = processInstance.getStartUserId();
              long startTime = processInstance.getStartTime().getTime();
              TaskNodeInfo taskNodeInfo =
                      TaskNodeInfo.newInstance(START_EVENT_KEY, startEventName(), startEventKey());
                // 根据任务节点taskDefinitionKey获取指定的UserTaskHandler
              UserTaskHandler userTaskHandler = userTaskHandlers.get(taskNodeInfo.getDefinitionKey());
      
              List<FieldInfo> processInstanceDataList = null;
              List<FieldInfo> taskNodeDataList = null;
              if (Objects.nonNull(userTaskHandler)) {
                  // 获取审核结果和建议;默认用户任务都会有审核结果和建议。
                  Object result = userTaskHandler.approvalResult(variables, data);
                  if (Objects.nonNull(result)) {
                      data.put(Approval.APPROVAL_RESULT.name(), result);
                  }
                  Object opinion = userTaskHandler.approvalOpinion(variables, data);
                  if (Objects.nonNull(opinion)) {
                      data.put(Approval.APPROVAL_OPINION.name(), opinion);
                  }
                  // 用户任务中,需要保存到流程实例中的数据
                  processInstanceDataList =
                          userTaskHandler.processInstanceData(
                                  processInstance, executor, businessKey, variables, data);
                  // 用户任务中,需要保存到节点快照的数据
                  taskNodeDataList =
                          userTaskHandler.taskNodeData(
                                  processInstance, executor, businessKey, variables, data);
              }
                // 和上面两个对应,下面是程序自动保存的数据,不依赖于开发者
              // 自动保存的流程实例数据
              List<FieldInfo> autoSaveProcessInstanceData =
                      activitiBusinessDataRepository.getAutoSaveBusinessInstanceVariables(
                              processDefinitionKey, processDefinitionName, startUserId, startTime);
              // 自动保存的流程节点数据
              List<FieldInfo> autoSaveTaskNodeVariables =
                      activitiBusinessDataRepository.getAutoSaveTaskNodeVariables(
                              executor, businessKey, data);
      
              // 保存该流程实例数据
              activitiBusinessDataRepository.saveProcessInstanceData(
                      processInstanceDataList,
                      autoSaveProcessInstanceData,
                      processInstanceId,
                      businessKey,
                      now);
              // 保存节点快照数据
              activitiBusinessDataRepository.saveTaskNodeData(
                      autoSaveTaskNodeVariables, taskNodeDataList, processInstanceId, taskNodeInfo, now);
              // 保存节点历史数据;历史数据属于自动保存的,不需要开发者操心。
              activitiBusinessDataRepository.saveHistory(
                      businessKey, processInstanceId, executor, taskNodeInfo, data, now);
          }
      
          /**
           * 所有任务办理入口
           * @description
           */
          @Transactional(rollbackFor = GlobalException.class)
          public Task completeTask(
                  String taskId,
                  String businessKey,
                  Executor executor,
                  Map<String, Object> variables,
                  Map<String, Object> data)
                  throws GlobalException {
                // 通过taskId作为查询参数,还有一个就是当前办理人的id
              Task task = queryUnassignedTask(taskId, executor.getId());
              if (Objects.isNull(task)) {
                  throw GlobalException.newInstance("TASK_NOT_EXIST", "任务不存在");
              }
                // 同上面的流程启动;作为办理任务的参数容器
              Map<String, Object> allVariables = Maps.newHashMap();
              if (!CollectionUtils.isEmpty(variables)) {
                  allVariables.putAll(variables);
              }
                // 前置操作
              beforeCompleteTask(task, businessKey, executor, allVariables, data);
                // 任务办理
              if (CollectionUtils.isEmpty(allVariables)) {
                  taskService.complete(task.getId());
              } else {
                  taskService.complete(task.getId(), allVariables);
              }
                // 后置操作
              afterCompleteTask(task, businessKey, executor, allVariables, data);
              return task;
          }
      
            // 前置操作
          protected void beforeCompleteTask(
                  Task task,
                  String businessKey,
                  Executor executor,
                  Map<String, Object> variables,
                  Map<String, Object> data)
                  throws GlobalException {
                // 获取当前任务的taskDefinitionKey
              String taskDefinitionKey = task.getTaskDefinitionKey();
                // 查询目标UserTaskHandler
              UserTaskHandler userTaskHandler = userTaskHandlers.get(taskDefinitionKey);
              if (Objects.nonNull(userTaskHandler)) {
                    // 执行任务处理的前置操作
                  userTaskHandler.beforeTask(task, businessKey, executor, variables, data);
              }
                // 读取该节点相关配置;只有执行到该节点,才会读取该节点的配置,而不是一次性读取流程的所有配置;注意按需加载
              activitiBusinessDataRepository.readProcessInstanceVariable(
                      processDefinitionKey(), taskDefinitionKey, variables);
          }
      
            // 后置操作
          protected void afterCompleteTask(
                  Task task,
                  String businessKey,
                  Executor executor,
                  Map<String, Object> variables,
                  Map<String, Object> data) {
              Timestamp now = new Timestamp(System.currentTimeMillis());
              String processInstanceId = task.getProcessInstanceId();
              TaskNodeInfo taskNodeInfo =
                      TaskNodeInfo.newInstance(task.getId(), task.getName(), task.getTaskDefinitionKey());
                // 拿到UserTaskHandler
              UserTaskHandler userTaskHandler = userTaskHandlers.get(taskNodeInfo.getDefinitionKey());
                // 说白了,下面还是准备数据,因为后面需要把数据作持久化
              List<FieldInfo> processInstanceDataList = null;
              List<FieldInfo> taskNodeDataList = null;
              if (Objects.nonNull(userTaskHandler)) {
                  // 审核意见和建议
                  Object result = userTaskHandler.approvalResult(variables, data);
                  if (Objects.nonNull(result)) {
                      data.put(Approval.APPROVAL_RESULT.name(), result);
                  }
                  Object opinion = userTaskHandler.approvalOpinion(variables, data);
                  if (Objects.nonNull(opinion)) {
                      data.put(Approval.APPROVAL_OPINION.name(), opinion);
                  }
                  // 流程实例数据
                  processInstanceDataList =
                          userTaskHandler.processInstanceData(
                                  task, executor, businessKey, variables, data);
                  // 节点数据
                  taskNodeDataList =
                          userTaskHandler.taskNodeData(task, executor, businessKey, variables, data);
              }
              // 自动保存的流程节点数据
              List<FieldInfo> autoSaveTaskNodeVariables =
                      activitiBusinessDataRepository.getAutoSaveTaskNodeVariables(
                              executor, businessKey, data);
                // 数据准备好后,下面开始持久化
              // 保存流程实例数据
              activitiBusinessDataRepository.saveProcessInstanceData(
                      processInstanceDataList, null, processInstanceId, businessKey, now);
              // 保存节点快照数据
              activitiBusinessDataRepository.saveTaskNodeData(
                      autoSaveTaskNodeVariables, taskNodeDataList, processInstanceId, taskNodeInfo, now);
              // 保存节点历史数据;其实是当前节点的时间、节点、办理人等相关信息,结合上面的快照数据,就可以完全恢复当前节点办理的任务
              activitiBusinessDataRepository.saveHistory(
                      businessKey, processInstanceId, executor, taskNodeInfo, data, now);
          }
      
          /**
           * 指定processDefinitionKey,需要子类实现
           *
           * @return
           */
          protected abstract String processDefinitionKey();
      
          /**
           * 指定startEventKey
           *
           * @return
           */
          protected String startEventKey() {
              return getStartEvent().getId();
          }
      
          /**
           * 指定startEventName
           *
           * @return
           */
          protected String startEventName() {
              return getStartEvent().getName();
          }
      
          protected ProcessDefinition getProcessDefinition() {
              return this.processDefinition;
          }
      
          protected BpmnModel getBpmnModel() {
              return this.bpmnModel;
          }
      
          protected StartEvent getStartEvent() {
              return this.startEvent;
          }
      
          protected Collection<FlowElement> getXmlNodeElements() {
              return this.xmlNodeElements;
          }
      
            // 实现ExecutionListener前置操作,避免子类不需要实现的时候也要写一个空实现;子类如果真的需要实现,可以重写该方法
          @Override
          protected void beforeExecute(
                  DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) {}
        
        // 实现ExecutionListener的notify操作,避免子类不需要实现的时候也要写一个空实现;子类如果真的需要实现,可以重写该方法
        @Override
          protected void execute(
                  DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) {
          }
      
        // 实现ExecutionListener后置操作,避免子类不需要实现的时候也要写一个空实现;子类如果真的需要实现,可以重写该方法
          @Override
          protected void afterExecute(DelegateExecution execution, Map<String, Object> data) {
          }
          /**
           * 不能重写;在流程实例的实现中,不需要实现这个方法,只有任务处理器需要实现
           *
           * @param execution
           * @param data
           * @return
           */
          @Override
          protected final List<FieldInfo> processInstanceData(
                  DelegateExecution execution, Map<String, Object> data) {
              return null;
          }
      
          /**
           * 不能重写;在流程实例的实现中,不需要实现这个方法,只有任务处理器需要实现
           *
           * @param execution
           * @param data
           * @return
           */
          @Override
          protected final List<FieldInfo> taskNodeData(
                  DelegateExecution execution, Map<String, Object> data) {
              return null;
          }
      }
      

      大家一定要仔细看上面这段代码的注释,里面阐述了这样设计的原因和思路

      • 任务处理器

        // 实现AbstractElement,开始分支出任务处理器
        public abstract class AbstractTaskHandler extends AbstractElement {
        
            private static final long serialVersionUID = -1470919793703094859L;
            @Autowired
            private IBusinessNodeVariablesRepository businessNodeVariablesRepository;
        
                // 通过任务ID查询历史任务
            public Object historyTaskInfo(String taskId){
                List<TaskNodeVariables> historyDataList =
                        businessNodeVariablesRepository.findByTaskId(taskId);
                if (CollectionUtils.isEmpty(historyDataList)) {
                    return null;
                }
                Map<String, Object> map = Maps.newHashMap();
                for (TaskNodeVariables nodeVariables : historyDataList) {
                    map.put(nodeVariables.getFieldName(), nodeVariables);
                }
                return map;
            }
        }
        
        // 注意UserTaskHandler实现了UserTaskListener,一定要回头去看ActivitiListener
        public abstract class UserTaskHandler extends AbstractTaskHandler implements UserTaskListener {
            /**
             * 办理任务之前准备variables;同样的空实现,是为了避免子类一定要去写这个实现
             *
             * @param task
             * @param businessKey
             * @param executor
             * @param variables
             * @param data
             */
            public void beforeTask(
                    Task task,
                    String businessKey,
                    Executor executor,
                    Map<String, Object> variables,
                    Map<String, Object> data)
                    throws GlobalException {
            }
        
            /**
             * 这个是启动流程实例和完成待办任务的实现中都会调用的函数,自定义保存节点数据;同样的空实现,是为了避免子类一定要去写这个实现
             *
             * @param obj
             * @param executor
             * @param businessKey
             * @param variables
             * @return
             */
            public List<FieldInfo> taskNodeData(
                    Object obj,
                    Executor executor,
                    String businessKey,
                    Map<String, Object> variables,
                    Map<String, Object> data) {
                return null;
            }
        
            /**
             * 这个是启动流程实例和完成待办任务的实现中都会调用的函数,自定义保存实例数据;同样的空实现,是为了避免子类一定要去写这个实现
             *
             * @param obj
             * @param executor
             * @param businessKey
             * @param variables
             * @return
             */
            public List<FieldInfo> processInstanceData(
                    Object obj,
                    Executor executor,
                    String businessKey,
                    Map<String, Object> variables,
                    Map<String, Object> data) {
                return null;
            }
        
            /**
             * 审批结果;这个就是要求所有的用户任务必须要有审核结果
             *
             * @return
             */
            public abstract Object approvalResult(Map<String, Object> variables, Map<String, Object> data);
        
            /**
             * 审批意见;要求所有的用户任务必须要有审核意见
             *
             * @return
             */
            public abstract Object approvalOpinion(Map<String, Object> variables, Map<String, Object> data);
        
            /**
             * Execute前置处理;同样的空实现,是为了避免子类一定要去写这个实现
             *
             * @param execution
             * @param variables
             * @param data
             */
            @Override
            protected void beforeExecute(
                    DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) {
            }
        
            /**
             * 后置处理;同样的空实现,是为了避免子类一定要去写这个实现
             *
             * @param execution
             * @param data
             */
            @Override
            protected final void afterExecute(DelegateExecution execution, Map<String, Object> data) {
            }
        
            // 这个基本没用,设计上还需要调整;同样的空实现,是为了避免子类一定要去写这个实现 
            @Override
            protected final List<FieldInfo> processInstanceData(
                    DelegateExecution execution, Map<String, Object> data) {
                return null;
            }
        
            // 这个基本没用,设计上还需要调整;同样的空实现,是为了避免子类一定要去写这个实现 
            @Override
            protected final List<FieldInfo> taskNodeData(
                    DelegateExecution execution, Map<String, Object> data) {
                return null;
            }
        
            // 这个就是实现ExecutionListener的空实现
            @Override
            protected void execute(
                    DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) {
            }
        
            // 如果是任务被创建的事件的话,那么先设置办理人,这就是动态实现设置办理人的思路;有需要可以扩展其他功能。
            @Override
            public void notify(DelegateTask delegateTask) {
                if (StringUtils.equalsIgnoreCase(delegateTask.getEventName(), "create")) {
                    setAssignee(delegateTask);
                }
            }
        
            /**
             * 设置办理人
             *
             * @param delegateTask
             */
            protected void setAssignee(DelegateTask delegateTask) {
                // 获取processDefinitionKey
                String processDefinitionKey =
                        CastUtil.castString(delegateTask.getVariable(Const.PROCESS_DEFINITION_KEY));
                // 获取taskDefinitionKey
                String taskDefinitionKey = delegateTask.getTaskDefinitionKey();
                Map<String, Object> variables = Maps.newHashMap();
                // 查询节点配置
                readTransactorAndSendNoticeVariable(processDefinitionKey, taskDefinitionKey, variables);
                // 根据配置查询办理人;TODO
                String assignee = "";
                delegateTask.setAssignee(assignee);
            }
          
          // 需要特殊处理的启动节点;我把它当用户任务处理掉
          public abstract static class StartEventTaskHandler extends UserTaskHandler {
        
                    // 把不可能调用的直接final处理掉
                @Override
                public final void notify(DelegateTask delegateTask) {
                }
        
                @Override
                public List<FieldInfo> taskNodeData(
                        Object obj,
                        Executor executor,
                        String businessKey,
                        Map<String, Object> variables,
                        Map<String, Object> data) {
                    return null;
                }
        
                @Override
                public List<FieldInfo> processInstanceData(
                        Object obj,
                        Executor executor,
                        String businessKey,
                        Map<String, Object> variables,
                        Map<String, Object> data) {
                    return null;
                }
        
                    // 把不可能调用的直接final处理掉
                @Override
                public final Object approvalResult(Map<String, Object> variables, Map<String, Object> data) {
                    return null;
                }
        
                    // 把不可能调用的直接final处理掉
                @Override
                public final Object approvalOpinion(Map<String, Object> variables, Map<String, Object> data) {
                    return null;
                }
            }
          // 需要特殊处理的结束节点;我把它当用户任务处理掉
          public abstract static class EndEventTaskHandler extends UserTaskHandler {
                    // 把不可能调用的直接final处理掉
                @Override
                public final List<FieldInfo> taskNodeData(
                        Object obj,
                        Executor executor,
                        String businessKey,
                        Map<String, Object> variables,
                        Map<String, Object> data) {
                    return null;
                }
        
                    // 把不可能调用的直接final处理掉
                @Override
                public final List<FieldInfo> processInstanceData(
                        Object obj,
                        Executor executor,
                        String businessKey,
                        Map<String, Object> variables,
                        Map<String, Object> data) {
                    return null;
                }
        
                    // 把不可能调用的直接final处理掉
                @Override
                public final Object approvalResult(Map<String, Object> variables, Map<String, Object> data) {
                    return null;
                }
        
                    // 把不可能调用的直接final处理掉
                @Override
                public final Object approvalOpinion(Map<String, Object> variables, Map<String, Object> data) {
                    return null;
                }
        
                    // 把不可能调用的直接final处理掉
                @Override
                public final void notify(DelegateTask delegateTask) {
                }
            }
        }
        
        // 这个类和ActivitiListener类似的原理,但是是针对ServiceTask的,因为ServiceTask组件需要指定一个JavaDelegate的实现
        public class ServiceTaskDelegate implements JavaDelegate {
        
            @Override
            public void execute(DelegateExecution execution) {
                // 获取组件id
                String taskDefinitionKey = execution.getCurrentActivityId();
                // 查找ServiceTaskHandler对象
                ServiceTaskHandler serviceTaskHandler =
                        ApplicationContextUtil.getBean(taskDefinitionKey, ServiceTaskHandler.class);
                if (Objects.nonNull(serviceTaskHandler)) {
                    // 调用目标方法
                    serviceTaskHandler.notify(execution);
                }
            }
        }
        
        // 这个是系统任务处理器,activiti里面还有一个ServiceTask的组件
        public abstract class ServiceTaskHandler extends AbstractTaskHandler {
        
            // 重写ExecutionListener后置操作,保存数据
            @Override
            protected void afterExecute(DelegateExecution execution, Map<String, Object> data) {
                saveData(execution, data);
            }
            
            // 空实现
            @Override
            protected List<FieldInfo> processInstanceData(
                    DelegateExecution execution, Map<String, Object> data) {
                return null;
            }
        
            // 空实现
            @Override
            protected List<FieldInfo> taskNodeData(DelegateExecution execution, Map<String, Object> data) {
                return null;
            }
        
            // 实现ExecutionListener的操作
            @Override
            protected void execute(
                    DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) {
                Object result = approvalResult(execution, variables, data);
                Object opinion = approvalOpinion(execution, variables, data);
                if (Objects.nonNull(result)) {
                    data.put(Approval.APPROVAL_RESULT.name(), result);
                }
                if (Objects.nonNull(opinion)) {
                    data.put(Approval.APPROVAL_OPINION.name(), opinion);
                }
            }
        
            /**
             * 审批结果
             *
             * @return
             */
            public abstract Object approvalResult(
                    DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data);
        
            /**
             * 审批意见
             *
             * @return
             */
            public abstract Object approvalOpinion(
                    DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data);
        }
        
        // 网关任务处理器
        public abstract class GateTaskHandler extends AbstractElement {
        
            private static final long serialVersionUID = -1425610003326612058L;
        
            // 空实现
            @Override
            protected void afterExecute(DelegateExecution execution, Map<String, Object> data) {}
        
            // 把不可能调用的直接final处理掉
            @Override
            protected final List<FieldInfo> processInstanceData(
                    DelegateExecution execution, Map<String, Object> data) {
                return null;
            }
        
            // 把不可能调用的直接final处理掉
            @Override
            protected final List<FieldInfo> taskNodeData(
                    DelegateExecution execution, Map<String, Object> data) {
                return null;
            }
        }
        

        按照这个,我把最主要的UserTask、ServiceTask等几个常见的任务给处理掉了。下面来看一下这些抽象类都怎么使用。

    具体实现

    // "leave"是流程图的id
    @Service("leave")
    public class LeaveServiceImpl extends BaseProcessServiceImpl
            implements ILeaveService {
      /**
         * 流程定义key
         */
        private static final String LEAVE_PROCESS_DEFINITION_KEY = "leave";
    
        @Override
        protected String processDefinitionKey() {
            return LEAVE_PROCESS_DEFINITION_KEY;
        }
    
        // 接口方法,需要实现,申请假期
        @Transactional(rollbackFor = Exception.class)
        @Override
        public String apply(Object data, String userId) throws GlobalException {
          // 准备数据TODO
          // 调用BaseProcessServiceImpl启动方法启动流程实例
          ProcessInstance processInstance =
                    startProcessInstance(businessKey, Executor.defaultExecutor(userId),
                            variables, data);
            return processInstance.getId();
        }
      
        // 接口方法,需要实现,审批业务
        @Transactional(rollbackFor = Exception.class)
        @Override
        public String approval(Object approval, String userId)
                throws GlobalException {
            // 准备数据
            Map<String, Object> variables = handleApprovalVariables(approval, userId);
            Map<String, Object> data = handleApprovalData(approval);
            // 调用父类的办理方法
            Task task =
                    completeTask(
                            approval.getTaskId(), approval.getBusinessKey(),
                            Executor.defaultExecutor(userId), variables, data);
            return task.getId();
        }
    }
    
    // 启动节点ID
    @Component("leave_apply")
    public class ApplyTask extends UserTaskHandler.StartEventTaskHandler {
      
        // 任务前置操作
        @Override
        public void beforeTask(
                Task task,
                String businessKey,
                Executor executor,
                Map<String, Object> variables,
                Map<String, Object> data)
                throws GlobalException {
        }
    
        // 任务数据快照
        @Override
        public List<FieldInfo> taskNodeData(
                Object obj,
                Executor executor,
                String businessKey,
                Map<String, Object> variables,
                Map<String, Object> data) {
            if (CollectionUtils.isEmpty(data)) {
                return null;
            }
            FieldInfo.Builder builder = new FieldInfo.Builder();
            for (Map.Entry<String, Object> entry : data.entrySet()) {
                String value = CastUtil.castString(entry.getValue(), StringUtils.EMPTY);
                builder.normalFieldInfo(entry.getKey(), value, "");
            }
            return builder.build();
        }
    
        // 保存在流程实例中的数据
        @Override
        public List<FieldInfo> processInstanceData(
                Object obj,
                Executor executor,
                String businessKey,
                Map<String, Object> variables,
                Map<String, Object> data) {
            String leave_reason = CastUtil.castString(variables.get("leave_reason"), StringUtils.EMPTY);
            String leave_date = CastUtil.castString(variables.get("leave_date"), StringUtils.EMPTY);
            return new FieldInfo.Builder()
                    .normalFieldInfo(
                            "leave_reason", leave_reason, "请假原因")
                    .normalFieldInfo("leave_date", leave_date, "请假日期")
                    .build();
        }
    
        @Override
        protected void execute(
                DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) {
        }
    }
    
    // 审批节点id
    @Component("leave_approval")
    public class UserApprovalTask extends UserTaskHandler {
    
        @Override
        public void beforeTask(
                Task task,
                String businessKey,
                Executor executor,
                Map<String, Object> variables,
                Map<String, Object> data)
                throws GlobalException {
        }
    
        // 节点快照数据;这里我存储的是审批数据
        @Override
        public List<FieldInfo> taskNodeData(
                Object obj,
                Executor executor,
                String businessKey,
                Map<String, Object> variables,
                Map<String, Object> data) {
    
            String approvalUserId = CastUtil.castString(variables.get("approvalUserId"), StringUtils.EMPTY);
            String approvalResult = CastUtil.castString(data.get("approvalResult"), StringUtils.EMPTY);
            String approvalOpinion = CastUtil.castString(data.get("approvalOpinion"), StringUtils.EMPTY);
    
            FieldInfo.Builder builder = new FieldInfo.Builder();
            builder.normalFieldInfo("approvalUserId", approvalUserId, "审批人ID")
                    .normalFieldInfo("approvalResult", approvalResult, "审批结果")
                    .normalFieldInfo("approvalOpinion", approvalOpinion, "审批意见");
    
            return builder.build();
        }
    
        // 同样的,我把需要的数据存储进流程实例中
        @Override
        public List<FieldInfo> processInstanceData(
                Object obj,
                Executor executor,
                String businessKey,
                Map<String, Object> variables,
                Map<String, Object> data) {
    
            String approvalUserId = CastUtil.castString(variables.get("approvalUserId"), StringUtils.EMPTY);
            String approvalResult = CastUtil.castString(variables.get("approvalResult"), StringUtils.EMPTY);
            String approvalOpinion = CastUtil.castString(variables.get("approvalOpinion"), StringUtils.EMPTY);
    
            return new FieldInfo.Builder()
                    .normalFieldInfo(
                            "approvalUserId",
                            approvalUserId,
                            "审批人ID")
                    .normalFieldInfo("approvalResult",
                            approvalResult,
                            "审批结果")
                    .normalFieldInfo(
                            "approvalOpinion",
                            approvalOpinion,
                            "审批意见")
                    .build();
        }
    
        // 审批结果
        @Override
        public Object approvalResult(Map<String, Object> variables, Map<String, Object> data) {
            return data.getOrDefault("approvalResult", StrUtil.EMPTY);
        }
    
        // 审批意见
        @Override
        public Object approvalOpinion(Map<String, Object> variables, Map<String, Object> data) {
            return data.getOrDefault("approvalOpinion", StrUtil.EMPTY);
        }
    
        // 自定义设置办理人
        @Override
        protected void setAssignee(DelegateTask delegateTask) {
            Object approvalUserId = delegateTask.getVariable("approvalUserId");
            if (Objects.isNull(approvalUserId)) {
                super.setAssignee(delegateTask);
            } else {
                String userId = CastUtil.castString(approvalUserId);
                delegateTask.setAssignee(userId);
            }
        }
    }
    

    通过以上代码及相关注释,我已经把我对于activiti工作量引擎的使用与思考总结起来了,感兴趣的小伙伴可以参与一起讨论。

    相关文章

      网友评论

          本文标题:Activiti工作流引擎的使用、思考与总结

          本文链接:https://www.haomeiwen.com/subject/khtxyltx.html