背景:
- 公司系统上有很多任务流程要一连串的执行,而且任务之间还有很多依赖关系。比如,A任务执行完了以后,B任务开始执行,但B跟C同时完成的情况下,才能执行D任务。简单的办法,我们可以写几个定时任务,各自分好时段,你跑完了,我再跑。但是这样的话,一整套流程跑下来,执行等待的时间会很长很长。那我们可以不间断的,像加了三通的水管里的水一样,流下去执行吗?答案是,可以的。spring家族的事件驱动型框架有好几个,结合MQ(官方首推kafka跟rabbit)之后,还是能玩得转的。
其实这东西还是比较好理解,任务之间的关系,通过MQ消息队列来进行传递维系。我执行完了往队列里放一条消息,告诉下一个,你可以开始跑了。实现这些都比较简单,利用官方提供的手册,研究研究还是能很快捯饬出来。但是,只有当B任务跟C任务同时完成后,哪怕B先完成了,也得等C执行完,这种情况下,才能往下执行D,这个怎么控制呢?我们知道,一般mq就两种模式,发布订阅跟生产消费,想了半天也没弄明白怎么利用这两模式完成这个控制动作。后来想了想,既然控制不了消息的订阅与消费,那就控制方法的执行,退而求其次呗。
脑子转到这,就想到了aop,利用环绕通知来进行控制,结合redis做个中转,再统计下各个任务的执行情况后进行判断代码是否继续往下执行。嗯.....,感觉行得通,来试试效果怎么样。
首先,我们得把这个依赖关系先解决,很显然,D是依赖B跟C的。在下面代码里,步骤4的执行,是依赖步骤2跟步骤3的,这个依赖关系设置,为了方便我做了个注解,往下看代码。我们把任务先枚举出来。
public enum TaskEnum {
STEPONEININIT(0,"步骤:1"),
STEPTWOINIT(1,"步骤:2"),
STEPTHREEINIT(2,"步骤:3"),
STEPFOURINIT(3,"步骤:4")
;
private int taskId;
private String taskName;
TaskEnum(int taskId, String taskName) {
this.taskId = taskId;
this.taskName = taskName;
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
}
接下来是注解,
/**
*
* @author yangpin
*
*/
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface TaskDependsOn {
@NotNull
TaskEnum[] taskEnum();
}
然后是关键的aop,用的环绕通知,
/**
* @Author yangpin
* @Desciption aop处理
* @Date 1:58 2020-03-26
**/
@Aspect
@Component
public class TaskListenerAspect {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public static final String TaskStreamMonitorQueue = "TaskMonitor:TaskStreamMonitorQueue";
@Autowired
StringRedisTemplate stringRedisTemplate;
/**
* @Author yangpin
* @Desciption 参数处理
* @Date 22:16 2020-03-18
* @Param [joinPoint]
* @return com.demo.aop.TaskListenerAspect.Params
**/
public Params getAnnotationValue(JoinPoint joinPoint) throws Exception {
String targetName = joinPoint.getTarget().getClass().getName();
String methodName = joinPoint.getSignature().getName();
Object[] arguments = joinPoint.getArgs();
Class targetClass = Class.forName(targetName);
Method[] methods = targetClass.getMethods();
Params params = new Params();
for (Method method : methods) {
if (method.getName().equals(methodName)) {
Class[] classes = method.getParameterTypes();
if (classes.length == arguments.length) {
TaskEnum[] taskEnums = method.getAnnotation(TaskDependsOn.class).taskEnum();
Task task = (Task) arguments[0];
params.setTask(task);
params.setTaskEnums(taskEnums);
break;
}
}
}
return params;
}
@Pointcut("execution(* com.demo..*(..)) && @annotation(com.demo.aop.TaskDependsOn)")
public void TaskListener() {}
@Around("TaskListener()")
public void round(ProceedingJoinPoint joinPoint) throws Throwable {
Params params = getAnnotationValue(joinPoint);
//判断方法是否执行
if (processInputArg(params))
joinPoint.proceed();
}
/**
* @Author yangpin
* @Desciption 利用redis暂存任务的执行情况
* @Date 1:52 2020-03-26
* @Param [params]
* @return boolean
**/
private boolean processInputArg(Params params) {
boolean proceed = false;
List<String> keys = new ArrayList<>();
//为了确保任务批次唯一,可以依照自己的业务规则,进行key值设定
String key = TaskStreamMonitorQueue+params.getTask().getStreamId() + "_" + params.getTask().getTaskId();
stringRedisTemplate.opsForValue().set(key,params.getTask().toString());
TaskEnum[] taskEnums = params.getTaskEnums();
for (int i = 0; i < taskEnums.length; i++) {
String tmpKey = TaskStreamMonitorQueue+params.getTask().getStreamId() + "_" + taskEnums[i].getTaskId();
keys.add(tmpKey);
}
List<String> strings = stringRedisTemplate.opsForValue().multiGet(keys);
strings.removeAll(Collections.singleton(null));
//TODO 从注解上获取任务之间的依赖关系,当最后一个依赖的任务执行完毕并放入redis后,取出所有依赖的任务进行统计
//TODO 注解依赖的任务数 == redis中已经执行完的任务数 ---> 可以放行
if (strings.size() == taskEnums.length) proceed = true;
return proceed;
}
class Params{
TaskEnum[] taskEnums;
Task task;
public TaskEnum[] getTaskEnums() {
return taskEnums;
}
public void setTaskEnums(TaskEnum[] taskEnums) {
this.taskEnums = taskEnums;
}
public Task getTask() {
return task;
}
public void setTask(Task task) {
this.task = task;
}
public Params() {
}
public Params(TaskEnum[] taskEnums, Task task) {
this.taskEnums = taskEnums;
this.task = task;
}
}
}
我们来看下注解的用法。这是任务“步骤:4”的处理类,为了能在页面实时监控每一步任务执行了多少,用了redis的队列进行处理。
@EnableBinding(DemoBinding.class)
public class TaskSink {
private static final Logger logger = LoggerFactory.getLogger(TaskSink.class);
@Autowired
StringRedisTemplate stringRedisTemplate;
protected static ExecutorService executorService = Executors.newSingleThreadExecutor();
//填入该任务所依赖的两个任务枚举值,分别是"步骤:2"跟"步骤:3"
@TaskDependsOn(taskEnum = {TaskEnum.STEPTWOINIT,TaskEnum.STEPTWOINIT})
@StreamListener(DemoBinding.INPUT_3)
public void process(Task task) {
logger.info("stream-demo3收到消息,"+ task.toString()+",开始步骤:4!" );
task.setTaskId(3);
task.setTaskName("步骤:4");
executorService.submit(new Runnable() {
@Override
public void run() {
try {
stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.StepfourCreateQueue,task.toString());
stringRedisTemplate.convertAndSend(TaskMonitorQueue.StepfourCreateQueue,task.toString());
} catch (Exception e) {
logger.error("步骤:4任务队列发送失败:" + task.toString());
}
}
});
}
}
好,再来看下“步骤:2”跟“步骤:3”是怎么处理的,
@EnableBinding(DemoBinding.class)
public class TaskProcessor {
private static final Logger logger = LoggerFactory.getLogger(TaskProcessor.class);
@Autowired
StringRedisTemplate stringRedisTemplate;
protected static ExecutorService executorService = Executors.newSingleThreadExecutor();
@StreamListener(DemoBinding.INPUT_1)
@SendTo({DemoBinding.OUTPUT_3})
public Task process(Task task) {
logger.info("stream-demo1,收到消息" + task.toString() + ",处理完毕!");
//任务id-1
task.setTaskId(1);
task.setTaskName("步骤:2");
executorService.submit(new Runnable() {
@Override
public void run() {
try {
stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.SteptwoCreateQueue,task.toString());
stringRedisTemplate.convertAndSend(TaskMonitorQueue.SteptwoCreateQueue,task.toString());
} catch (Exception e) {
logger.error("步骤:2任务队列发送失败:" + task.toString());
}
}
});
return task;
}
@StreamListener(DemoBinding.INPUT_2)
@SendTo({DemoBinding.OUTPUT_3})
public Task process2(Task task) {
logger.info("stream-demo2,收到消息" + task.toString() + ",处理完毕!");
//任务id-2
task.setTaskId(2);
task.setTaskName("步骤:3");
executorService.submit(new Runnable() {
@Override
public void run() {
try {
stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.StepthreeCreateQueue,task.toString());
stringRedisTemplate.convertAndSend(TaskMonitorQueue.StepthreeCreateQueue,task.toString());
} catch (Exception e) {
logger.error("步骤:3任务队列发送失败:" + task.toString());
}
}
});
return task;
}
}
我们继续往下看“步骤:1”,也就是源头任务,怎么处理的。
@EnableScheduling
@EnableBinding(DemoBinding.class)
public class TaskSender {
private static final Logger logger = LoggerFactory.getLogger(TaskSender.class);
@Autowired
private DemoBinding source;
@Autowired
StringRedisTemplate stringRedisTemplate;
protected static ExecutorService executorService = Executors.newSingleThreadExecutor();
@Scheduled(initialDelay = 10000,fixedDelay = 3000)
public void sendEvents() {
Task task = new Task();
//任务id-0,为保证任务流程批次的唯一性,利用uuid设置streamId进行甄别
task.setStreamId(UUID.randomUUID().toString().replace("-",""));
task.setTaskId(0);
task.setTaskName("步骤:1");
task.setOrgansin("ABCD" + new Random().nextInt(700));
//完成,通知步骤1跟步骤2
this.source.output1().send(MessageBuilder.withPayload(task).build());
this.source.output2().send(MessageBuilder.withPayload(task).build());
logger.info("向stream-demo1,stream-demo2发送消息:"+ task.toString());
executorService.submit(new Runnable() {
@Override
public void run() {
try {
stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.SteponeCreateQueue,task.toString());
stringRedisTemplate.convertAndSend(TaskMonitorQueue.SteponeCreateQueue,task.toString());
} catch (Exception e) {
e.printStackTrace();
logger.error("步骤:1任务队列发送失败:" + task.toString());
}
}
});
}
}
为了实时掌握任务的执行情况并反馈到页面,用了websocket,在这就不赘述了。
任务关系:1先执行完,通知2、3,然后2、3都完成了,跑4,看下最后的效果:
spring-cloud-stream-demo.gif
看起来是实现了这个效果。
网友评论