美文网首页
任务的流转控制与监测-spring cloud stream

任务的流转控制与监测-spring cloud stream

作者: 风里有神通 | 来源:发表于2020-03-26 02:44 被阅读0次

    背景:

    • 公司系统上有很多任务流程要一连串的执行,而且任务之间还有很多依赖关系。比如,A任务执行完了以后,B任务开始执行,但B跟C同时完成的情况下,才能执行D任务。简单的办法,我们可以写几个定时任务,各自分好时段,你跑完了,我再跑。但是这样的话,一整套流程跑下来,执行等待的时间会很长很长。那我们可以不间断的,像加了三通的水管里的水一样,流下去执行吗?答案是,可以的。spring家族的事件驱动型框架有好几个,结合MQ(官方首推kafka跟rabbit)之后,还是能玩得转的。
      其实这东西还是比较好理解,任务之间的关系,通过MQ消息队列来进行传递维系。我执行完了往队列里放一条消息,告诉下一个,你可以开始跑了。实现这些都比较简单,利用官方提供的手册,研究研究还是能很快捯饬出来。但是,只有当B任务跟C任务同时完成后,哪怕B先完成了,也得等C执行完,这种情况下,才能往下执行D,这个怎么控制呢?我们知道,一般mq就两种模式,发布订阅跟生产消费,想了半天也没弄明白怎么利用这两模式完成这个控制动作。后来想了想,既然控制不了消息的订阅与消费,那就控制方法的执行,退而求其次呗。
      脑子转到这,就想到了aop,利用环绕通知来进行控制,结合redis做个中转,再统计下各个任务的执行情况后进行判断代码是否继续往下执行。嗯.....,感觉行得通,来试试效果怎么样。
      首先,我们得把这个依赖关系先解决,很显然,D是依赖B跟C的。在下面代码里,步骤4的执行,是依赖步骤2跟步骤3的,这个依赖关系设置,为了方便我做了个注解,往下看代码。我们把任务先枚举出来。
    public enum TaskEnum {
            STEPONEININIT(0,"步骤:1"),
            STEPTWOINIT(1,"步骤:2"),
            STEPTHREEINIT(2,"步骤:3"),
            STEPFOURINIT(3,"步骤:4")
            ;
    
            private int taskId;
            private String taskName;
    
            TaskEnum(int taskId, String taskName) {
                this.taskId = taskId;
                this.taskName = taskName;
            }
    
            public int getTaskId() {
                return taskId;
            }
    
            public void setTaskId(int taskId) {
                this.taskId = taskId;
            }
    
            public String getTaskName() {
                return taskName;
            }
    
            public void setTaskName(String taskName) {
                this.taskName = taskName;
            }
    }
    

    接下来是注解,

    
    /**
     * 
     * @author yangpin
     *
     */
    @Target({ElementType.PARAMETER, ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    public @interface TaskDependsOn {
    
        @NotNull
        TaskEnum[] taskEnum();
    
    }
    

    然后是关键的aop,用的环绕通知,

    
    /**
    * @Author yangpin
    * @Desciption aop处理
    * @Date 1:58 2020-03-26
    **/
    @Aspect
    @Component
    public class TaskListenerAspect {
     
        private Logger logger = LoggerFactory.getLogger(this.getClass());
    
        public static final String TaskStreamMonitorQueue = "TaskMonitor:TaskStreamMonitorQueue";
    
    
        @Autowired
        StringRedisTemplate stringRedisTemplate;
    
        /**
        * @Author yangpin
        * @Desciption 参数处理
        * @Date 22:16 2020-03-18
        * @Param [joinPoint]
        * @return com.demo.aop.TaskListenerAspect.Params
        **/
        public Params getAnnotationValue(JoinPoint joinPoint) throws Exception {
            String targetName = joinPoint.getTarget().getClass().getName();
            String methodName = joinPoint.getSignature().getName();
            Object[] arguments = joinPoint.getArgs();
            Class targetClass = Class.forName(targetName);
            Method[] methods = targetClass.getMethods();
            Params params = new Params();
            for (Method method : methods) {
                if (method.getName().equals(methodName)) {
                    Class[] classes = method.getParameterTypes();
                    if (classes.length == arguments.length) {
                        TaskEnum[] taskEnums = method.getAnnotation(TaskDependsOn.class).taskEnum();
                        Task task = (Task) arguments[0];
                        params.setTask(task);
                        params.setTaskEnums(taskEnums);
                        break;
                    }
                }
            }
            return params;
        }
    
    
        @Pointcut("execution(* com.demo..*(..))  && @annotation(com.demo.aop.TaskDependsOn)")
        public void TaskListener() {}
     
        @Around("TaskListener()")
        public void round(ProceedingJoinPoint joinPoint) throws Throwable {
            Params params = getAnnotationValue(joinPoint);
            //判断方法是否执行
            if (processInputArg(params))
                joinPoint.proceed();
        }
    
        /**
        * @Author yangpin
        * @Desciption 利用redis暂存任务的执行情况
        * @Date 1:52 2020-03-26
        * @Param [params]
        * @return boolean
        **/
        private boolean processInputArg(Params params) {
            boolean proceed = false;
    
            List<String> keys = new ArrayList<>();
    
            //为了确保任务批次唯一,可以依照自己的业务规则,进行key值设定
            String key = TaskStreamMonitorQueue+params.getTask().getStreamId() + "_" + params.getTask().getTaskId();
    
            stringRedisTemplate.opsForValue().set(key,params.getTask().toString());
    
            TaskEnum[] taskEnums = params.getTaskEnums();
    
            for (int i = 0; i < taskEnums.length; i++) {
                String tmpKey = TaskStreamMonitorQueue+params.getTask().getStreamId() + "_" + taskEnums[i].getTaskId();
                keys.add(tmpKey);
            }
    
            List<String> strings = stringRedisTemplate.opsForValue().multiGet(keys);
    
            strings.removeAll(Collections.singleton(null));
    
            //TODO 从注解上获取任务之间的依赖关系,当最后一个依赖的任务执行完毕并放入redis后,取出所有依赖的任务进行统计
            //TODO 注解依赖的任务数 == redis中已经执行完的任务数 ---> 可以放行  
            if (strings.size() == taskEnums.length) proceed = true;
    
            return proceed;
        }
    
    
    
        class Params{
    
            TaskEnum[] taskEnums;
    
            Task task;
    
            public TaskEnum[] getTaskEnums() {
                return taskEnums;
            }
    
            public void setTaskEnums(TaskEnum[] taskEnums) {
                this.taskEnums = taskEnums;
            }
    
            public Task getTask() {
                return task;
            }
    
            public void setTask(Task task) {
                this.task = task;
            }
    
            public Params() {
            }
    
            public Params(TaskEnum[] taskEnums, Task task) {
                this.taskEnums = taskEnums;
                this.task = task;
            }
        }
    
    
    }
    
    

    我们来看下注解的用法。这是任务“步骤:4”的处理类,为了能在页面实时监控每一步任务执行了多少,用了redis的队列进行处理。

    
    @EnableBinding(DemoBinding.class)
    public class TaskSink {
    
        private static final Logger logger = LoggerFactory.getLogger(TaskSink.class);
    
    
        @Autowired
        StringRedisTemplate stringRedisTemplate;
    
        protected static ExecutorService executorService = Executors.newSingleThreadExecutor();
    
            //填入该任务所依赖的两个任务枚举值,分别是"步骤:2"跟"步骤:3"
        @TaskDependsOn(taskEnum = {TaskEnum.STEPTWOINIT,TaskEnum.STEPTWOINIT})
        @StreamListener(DemoBinding.INPUT_3)
        public void process(Task task) {
            logger.info("stream-demo3收到消息,"+ task.toString()+",开始步骤:4!" );
            task.setTaskId(3);
            task.setTaskName("步骤:4");
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.StepfourCreateQueue,task.toString());
                        stringRedisTemplate.convertAndSend(TaskMonitorQueue.StepfourCreateQueue,task.toString());
                    } catch (Exception e) {
                        logger.error("步骤:4任务队列发送失败:" + task.toString());
                    }
                }
            });
        }
    
    }
    

    好,再来看下“步骤:2”跟“步骤:3”是怎么处理的,

    
    @EnableBinding(DemoBinding.class)
    public class TaskProcessor {
    
        private static final Logger logger = LoggerFactory.getLogger(TaskProcessor.class);
    
    
        @Autowired
        StringRedisTemplate stringRedisTemplate;
    
        protected static ExecutorService executorService = Executors.newSingleThreadExecutor();
    
    
        @StreamListener(DemoBinding.INPUT_1)
        @SendTo({DemoBinding.OUTPUT_3})
        public Task process(Task task) {
            logger.info("stream-demo1,收到消息" + task.toString() + ",处理完毕!");
            //任务id-1
            task.setTaskId(1);
            task.setTaskName("步骤:2");
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.SteptwoCreateQueue,task.toString());
                        stringRedisTemplate.convertAndSend(TaskMonitorQueue.SteptwoCreateQueue,task.toString());
                    } catch (Exception e) {
                        logger.error("步骤:2任务队列发送失败:" + task.toString());
                    }
                }
            });
            return task;
        }
    
        @StreamListener(DemoBinding.INPUT_2)
        @SendTo({DemoBinding.OUTPUT_3})
        public Task process2(Task task) {
            logger.info("stream-demo2,收到消息" + task.toString() + ",处理完毕!");
            //任务id-2
            task.setTaskId(2);
            task.setTaskName("步骤:3");
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.StepthreeCreateQueue,task.toString());
                        stringRedisTemplate.convertAndSend(TaskMonitorQueue.StepthreeCreateQueue,task.toString());
                    } catch (Exception e) {
                        logger.error("步骤:3任务队列发送失败:" + task.toString());
                    }
                }
            });
            return task;
        }
    }
    

    我们继续往下看“步骤:1”,也就是源头任务,怎么处理的。

    
    @EnableScheduling
    @EnableBinding(DemoBinding.class)
    public class TaskSender {
    
        private static final Logger logger = LoggerFactory.getLogger(TaskSender.class);
    
        @Autowired
        private DemoBinding source;
    
        @Autowired
        StringRedisTemplate stringRedisTemplate;
    
        protected static ExecutorService executorService = Executors.newSingleThreadExecutor();
    
        @Scheduled(initialDelay = 10000,fixedDelay = 3000)
        public void sendEvents() {
            Task task = new Task();
            //任务id-0,为保证任务流程批次的唯一性,利用uuid设置streamId进行甄别
            task.setStreamId(UUID.randomUUID().toString().replace("-",""));
            task.setTaskId(0);
            task.setTaskName("步骤:1");
            task.setOrgansin("ABCD" + new Random().nextInt(700));
            //完成,通知步骤1跟步骤2
            this.source.output1().send(MessageBuilder.withPayload(task).build());
            this.source.output2().send(MessageBuilder.withPayload(task).build());
            logger.info("向stream-demo1,stream-demo2发送消息:"+ task.toString());
    
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.SteponeCreateQueue,task.toString());
                        stringRedisTemplate.convertAndSend(TaskMonitorQueue.SteponeCreateQueue,task.toString());
                    } catch (Exception e) {
                        e.printStackTrace();
                        logger.error("步骤:1任务队列发送失败:" + task.toString());
                    }
                }
            });
        }
    }
    

    为了实时掌握任务的执行情况并反馈到页面,用了websocket,在这就不赘述了。
    任务关系:1先执行完,通知2、3,然后2、3都完成了,跑4,看下最后的效果:


    spring-cloud-stream-demo.gif

    看起来是实现了这个效果。

    相关文章

      网友评论

          本文标题:任务的流转控制与监测-spring cloud stream

          本文链接:https://www.haomeiwen.com/subject/hnofuhtx.html