Demo项目代码:
https://gitee.com/AnAnXiEr/activiti6-demo
1、作业执行器的配置
2、配置自定义线程池
3、流程定义定时任务流程
作业执行器的配置
1、相关配置
- asyncExecutorActivate:激活作业执行器(默认是关的)
- asyncExecutorXXX:异步执行器的属性配置
- asyncExecutor:异步执行器bean
2、自定义线程池ExecutorService
- corePoolSize:核心线程数
- maxPoolSize:最大线程数
- queueCapacity:堵塞队列大小
线程池执行原理:
提交任务-》创建线程-》第二个任务提交-》开启新线程(直到线程数达到corePoolSize的数量)
-》再有新线程 -》检查核心线程里的线程是否都在运行 -》如果都在执行,则放到队列中
-》如果队列已经满了,还有任务提交,则又会创建新线程(直到线程数达到maxPoolSize数)
-》max也达到了,则拒绝新的任务提交
3、自定义线程池配置ExecutorService
![](https://img.haomeiwen.com/i6929381/b15271644359ed80.png)
rejectedExecutionHandler:拒绝策略
4、定时开始事件(Timer Start Event)
![](https://img.haomeiwen.com/i6929381/0a3bacf0747de5a7.png)
- timeDate:指定启动事件
- timeDuration:指定持续事件间隔后执行
- timeCycle:R5/P1DT1H 指定事件端后周期执行(流程部署后、1天1小时后开始执行,执行5次,间隔时间1天1小时)
代码
ConfigJobTest
package com.imooc.activiti.activitidemo;
import org.activiti.engine.runtime.Job;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.engine.task.Task;
import org.activiti.engine.test.ActivitiRule;
import org.activiti.engine.test.Deployment;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertNotNull;
/**
* @Description 数据库配置测试
* @Author 胡浩
* @Date 2019/8/16
**/
public class ConfigJobTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigJobTest.class);
//将启动activiti引擎构建好了
@Rule
public ActivitiRule activitiRule = new ActivitiRule("activiti_job.cfg.xml");
@Test
@Deployment(resources = "job-process.bpmn20.xml")
public void testConfig() throws InterruptedException {
LOGGER.info("start");
List<Job> jobs = activitiRule.getManagementService().createTimerJobQuery().listPage(0, 100);
for (Job job : jobs) {
LOGGER.info("定时任务{},默认重试次数 = {}", job, job.getRetries());
}
LOGGER.info("jobs.size = {}", jobs.size());
Thread.sleep(1000 * 100);
LOGGER.info("end");
}
}
job-process.bpmn20.xml
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI"
typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath"
targetNamespace="http://www.activiti.org/processdef">
<process id="my-process" name="二级审批流程" isExecutable="true">
<documentation>MDC test process</documentation>
<!--<startEvent id="startEvent" name="开始"/>-->
<startEvent id="startEvent">
<timerEventDefinition>
<timeCycle>R5/PT10S</timeCycle>
</timerEventDefinition>
</startEvent>
<sequenceFlow id="sid-905AF026-9221-4C80-AF9A-29C581E59503" sourceRef="startEvent"
targetRef="tlApprove"/>
<userTask id="tlApprove" name="主管审批"/>
<!--<serviceTask id="tlApprove" activiti:class="com.imooc.activiti.activitidemo.delegage.MDCErrorServiceDelegate"/>-->
<sequenceFlow id="sid-905AF026-9221-4C80-AF9A-29C581E59505" sourceRef="tlApprove"
targetRef="endEventCancel"/>
<endEvent id="endEventCancel" name="取消"/>
</process>
</definitions>
activiti_job.cfg.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean class="org.activiti.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration"
id="processEngineConfiguration">
<property name="commandInvoker" ref="mdcCommandInvoker"/>
<!--打开线程池-->
<property name="asyncExecutorActivate" value="true"></property>
<property name="asyncExecutor" ref="asyncExecutor"></property>
<!--配置timer和job事件监听-->
<property name="eventListeners">
<list>
<bean class="com.imooc.activiti.activitidemo.event.JobEventListener"></bean>
</list>
</property>
</bean>
<bean id="asyncExecutor" class="org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor">
<property name="executorService" ref="executorService"></property>
</bean>
<bean id="executorService" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
<property name="threadNamePrefix" value="activiti-job-"></property>
<property name="corePoolSize" value="5"/>
<property name="maxPoolSize" value="20"/>
<property name="queueCapacity" value="100"/>
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"></bean>
</property>
</bean>
<bean class="com.imooc.activiti.activitidemo.interceptor.MDCCommandInvoker" id="mdcCommandInvoker"/>
</beans>
JobEventListener
package com.imooc.activiti.activitidemo.event;
import org.activiti.engine.delegate.event.ActivitiEvent;
import org.activiti.engine.delegate.event.ActivitiEventListener;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @Description 定时任务event
* @Author 胡浩
* @Date 2019/8/20
**/
public class JobEventListener implements ActivitiEventListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ActivitiEventListener.class);
@Override
public void onEvent(ActivitiEvent activitiEvent) {
ActivitiEventType eventType = activitiEvent.getType();
String name = eventType.name();
if (name.startsWith("TIMER") || name.startsWith("JOB")) {
LOGGER.info("监听TIMER事件 和 JOB事件 {} \t {}", eventType, activitiEvent.getProcessInstanceId());
}
}
@Override
public boolean isFailOnException() {
return false;
}
}
网友评论