美文网首页
任务的流转控制与监测-spring cloud stream

任务的流转控制与监测-spring cloud stream

作者: 风里有神通 | 来源:发表于2020-03-26 02:44 被阅读0次

背景:

  • 公司系统上有很多任务流程要一连串的执行,而且任务之间还有很多依赖关系。比如,A任务执行完了以后,B任务开始执行,但B跟C同时完成的情况下,才能执行D任务。简单的办法,我们可以写几个定时任务,各自分好时段,你跑完了,我再跑。但是这样的话,一整套流程跑下来,执行等待的时间会很长很长。那我们可以不间断的,像加了三通的水管里的水一样,流下去执行吗?答案是,可以的。spring家族的事件驱动型框架有好几个,结合MQ(官方首推kafka跟rabbit)之后,还是能玩得转的。
    其实这东西还是比较好理解,任务之间的关系,通过MQ消息队列来进行传递维系。我执行完了往队列里放一条消息,告诉下一个,你可以开始跑了。实现这些都比较简单,利用官方提供的手册,研究研究还是能很快捯饬出来。但是,只有当B任务跟C任务同时完成后,哪怕B先完成了,也得等C执行完,这种情况下,才能往下执行D,这个怎么控制呢?我们知道,一般mq就两种模式,发布订阅跟生产消费,想了半天也没弄明白怎么利用这两模式完成这个控制动作。后来想了想,既然控制不了消息的订阅与消费,那就控制方法的执行,退而求其次呗。
    脑子转到这,就想到了aop,利用环绕通知来进行控制,结合redis做个中转,再统计下各个任务的执行情况后进行判断代码是否继续往下执行。嗯.....,感觉行得通,来试试效果怎么样。
    首先,我们得把这个依赖关系先解决,很显然,D是依赖B跟C的。在下面代码里,步骤4的执行,是依赖步骤2跟步骤3的,这个依赖关系设置,为了方便我做了个注解,往下看代码。我们把任务先枚举出来。
public enum TaskEnum {
        STEPONEININIT(0,"步骤:1"),
        STEPTWOINIT(1,"步骤:2"),
        STEPTHREEINIT(2,"步骤:3"),
        STEPFOURINIT(3,"步骤:4")
        ;

        private int taskId;
        private String taskName;

        TaskEnum(int taskId, String taskName) {
            this.taskId = taskId;
            this.taskName = taskName;
        }

        public int getTaskId() {
            return taskId;
        }

        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }

        public String getTaskName() {
            return taskName;
        }

        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }
}

接下来是注解,


/**
 * 
 * @author yangpin
 *
 */
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface TaskDependsOn {

    @NotNull
    TaskEnum[] taskEnum();

}

然后是关键的aop,用的环绕通知,


/**
* @Author yangpin
* @Desciption aop处理
* @Date 1:58 2020-03-26
**/
@Aspect
@Component
public class TaskListenerAspect {
 
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final String TaskStreamMonitorQueue = "TaskMonitor:TaskStreamMonitorQueue";


    @Autowired
    StringRedisTemplate stringRedisTemplate;

    /**
    * @Author yangpin
    * @Desciption 参数处理
    * @Date 22:16 2020-03-18
    * @Param [joinPoint]
    * @return com.demo.aop.TaskListenerAspect.Params
    **/
    public Params getAnnotationValue(JoinPoint joinPoint) throws Exception {
        String targetName = joinPoint.getTarget().getClass().getName();
        String methodName = joinPoint.getSignature().getName();
        Object[] arguments = joinPoint.getArgs();
        Class targetClass = Class.forName(targetName);
        Method[] methods = targetClass.getMethods();
        Params params = new Params();
        for (Method method : methods) {
            if (method.getName().equals(methodName)) {
                Class[] classes = method.getParameterTypes();
                if (classes.length == arguments.length) {
                    TaskEnum[] taskEnums = method.getAnnotation(TaskDependsOn.class).taskEnum();
                    Task task = (Task) arguments[0];
                    params.setTask(task);
                    params.setTaskEnums(taskEnums);
                    break;
                }
            }
        }
        return params;
    }


    @Pointcut("execution(* com.demo..*(..))  && @annotation(com.demo.aop.TaskDependsOn)")
    public void TaskListener() {}
 
    @Around("TaskListener()")
    public void round(ProceedingJoinPoint joinPoint) throws Throwable {
        Params params = getAnnotationValue(joinPoint);
        //判断方法是否执行
        if (processInputArg(params))
            joinPoint.proceed();
    }

    /**
    * @Author yangpin
    * @Desciption 利用redis暂存任务的执行情况
    * @Date 1:52 2020-03-26
    * @Param [params]
    * @return boolean
    **/
    private boolean processInputArg(Params params) {
        boolean proceed = false;

        List<String> keys = new ArrayList<>();

        //为了确保任务批次唯一,可以依照自己的业务规则,进行key值设定
        String key = TaskStreamMonitorQueue+params.getTask().getStreamId() + "_" + params.getTask().getTaskId();

        stringRedisTemplate.opsForValue().set(key,params.getTask().toString());

        TaskEnum[] taskEnums = params.getTaskEnums();

        for (int i = 0; i < taskEnums.length; i++) {
            String tmpKey = TaskStreamMonitorQueue+params.getTask().getStreamId() + "_" + taskEnums[i].getTaskId();
            keys.add(tmpKey);
        }

        List<String> strings = stringRedisTemplate.opsForValue().multiGet(keys);

        strings.removeAll(Collections.singleton(null));

        //TODO 从注解上获取任务之间的依赖关系,当最后一个依赖的任务执行完毕并放入redis后,取出所有依赖的任务进行统计
        //TODO 注解依赖的任务数 == redis中已经执行完的任务数 ---> 可以放行  
        if (strings.size() == taskEnums.length) proceed = true;

        return proceed;
    }



    class Params{

        TaskEnum[] taskEnums;

        Task task;

        public TaskEnum[] getTaskEnums() {
            return taskEnums;
        }

        public void setTaskEnums(TaskEnum[] taskEnums) {
            this.taskEnums = taskEnums;
        }

        public Task getTask() {
            return task;
        }

        public void setTask(Task task) {
            this.task = task;
        }

        public Params() {
        }

        public Params(TaskEnum[] taskEnums, Task task) {
            this.taskEnums = taskEnums;
            this.task = task;
        }
    }


}

我们来看下注解的用法。这是任务“步骤:4”的处理类,为了能在页面实时监控每一步任务执行了多少,用了redis的队列进行处理。


@EnableBinding(DemoBinding.class)
public class TaskSink {

    private static final Logger logger = LoggerFactory.getLogger(TaskSink.class);


    @Autowired
    StringRedisTemplate stringRedisTemplate;

    protected static ExecutorService executorService = Executors.newSingleThreadExecutor();

        //填入该任务所依赖的两个任务枚举值,分别是"步骤:2"跟"步骤:3"
    @TaskDependsOn(taskEnum = {TaskEnum.STEPTWOINIT,TaskEnum.STEPTWOINIT})
    @StreamListener(DemoBinding.INPUT_3)
    public void process(Task task) {
        logger.info("stream-demo3收到消息,"+ task.toString()+",开始步骤:4!" );
        task.setTaskId(3);
        task.setTaskName("步骤:4");
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.StepfourCreateQueue,task.toString());
                    stringRedisTemplate.convertAndSend(TaskMonitorQueue.StepfourCreateQueue,task.toString());
                } catch (Exception e) {
                    logger.error("步骤:4任务队列发送失败:" + task.toString());
                }
            }
        });
    }

}

好,再来看下“步骤:2”跟“步骤:3”是怎么处理的,


@EnableBinding(DemoBinding.class)
public class TaskProcessor {

    private static final Logger logger = LoggerFactory.getLogger(TaskProcessor.class);


    @Autowired
    StringRedisTemplate stringRedisTemplate;

    protected static ExecutorService executorService = Executors.newSingleThreadExecutor();


    @StreamListener(DemoBinding.INPUT_1)
    @SendTo({DemoBinding.OUTPUT_3})
    public Task process(Task task) {
        logger.info("stream-demo1,收到消息" + task.toString() + ",处理完毕!");
        //任务id-1
        task.setTaskId(1);
        task.setTaskName("步骤:2");
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.SteptwoCreateQueue,task.toString());
                    stringRedisTemplate.convertAndSend(TaskMonitorQueue.SteptwoCreateQueue,task.toString());
                } catch (Exception e) {
                    logger.error("步骤:2任务队列发送失败:" + task.toString());
                }
            }
        });
        return task;
    }

    @StreamListener(DemoBinding.INPUT_2)
    @SendTo({DemoBinding.OUTPUT_3})
    public Task process2(Task task) {
        logger.info("stream-demo2,收到消息" + task.toString() + ",处理完毕!");
        //任务id-2
        task.setTaskId(2);
        task.setTaskName("步骤:3");
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.StepthreeCreateQueue,task.toString());
                    stringRedisTemplate.convertAndSend(TaskMonitorQueue.StepthreeCreateQueue,task.toString());
                } catch (Exception e) {
                    logger.error("步骤:3任务队列发送失败:" + task.toString());
                }
            }
        });
        return task;
    }
}

我们继续往下看“步骤:1”,也就是源头任务,怎么处理的。


@EnableScheduling
@EnableBinding(DemoBinding.class)
public class TaskSender {

    private static final Logger logger = LoggerFactory.getLogger(TaskSender.class);

    @Autowired
    private DemoBinding source;

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    protected static ExecutorService executorService = Executors.newSingleThreadExecutor();

    @Scheduled(initialDelay = 10000,fixedDelay = 3000)
    public void sendEvents() {
        Task task = new Task();
        //任务id-0,为保证任务流程批次的唯一性,利用uuid设置streamId进行甄别
        task.setStreamId(UUID.randomUUID().toString().replace("-",""));
        task.setTaskId(0);
        task.setTaskName("步骤:1");
        task.setOrgansin("ABCD" + new Random().nextInt(700));
        //完成,通知步骤1跟步骤2
        this.source.output1().send(MessageBuilder.withPayload(task).build());
        this.source.output2().send(MessageBuilder.withPayload(task).build());
        logger.info("向stream-demo1,stream-demo2发送消息:"+ task.toString());

        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.SteponeCreateQueue,task.toString());
                    stringRedisTemplate.convertAndSend(TaskMonitorQueue.SteponeCreateQueue,task.toString());
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error("步骤:1任务队列发送失败:" + task.toString());
                }
            }
        });
    }
}

为了实时掌握任务的执行情况并反馈到页面,用了websocket,在这就不赘述了。
任务关系:1先执行完,通知2、3,然后2、3都完成了,跑4,看下最后的效果:


spring-cloud-stream-demo.gif

看起来是实现了这个效果。

相关文章

网友评论

      本文标题:任务的流转控制与监测-spring cloud stream

      本文链接:https://www.haomeiwen.com/subject/hnofuhtx.html