美文网首页
【方案设计】任务监控

【方案设计】任务监控

作者: 嘻洋洋 | 来源:发表于2021-11-12 10:57 被阅读0次

    1.业务场景:

    1.1 场景一

    餐饮POS打印,多处(客户端)下单,共享一个打印机,因为共享资源等情况,那么可能出现打印失败异常情况,针对这些打印失败任务,我们需要知道,并且自动重新打印。因此需要设计一套监控失败任务体系,打印失败后自动触发监控,执行相应的纠错措施。

    1.2 场景二

    电商退款场景,财务进行退款流程操作后,需要通知外部系统,同时定时调用银行接口退款,是否退款完成状态需要通知各个系统。 因此需要设计一个后台监控程序,收到各种消息后,通知各个系统。

    2.设计思路

    (1)首先,监控程序是在后台一直运行(否则无法监控),因此利用多线程启动监控程序,一直运行在后台。
    (2)需要有一个队列存放新产生的任务,因为同一时刻可能产生大量任务,系统会来不及处理。
    (3)监控程序要一直运行,因此需要设计为用无限循环(while (true)),这个无限循环可以阻塞挂起(没有监控到任务时),如果监控到新产生的任务时能继续执行。

    针对第1、2点提到队列,阻塞进程的问题,使用阻塞队列LinkedBlockingQueue存放产生的任务可以完美适配。该队列在线程中是安全的、先进先出、生成消费型首先、阻塞队列。当使用take()方法获取队列信息时,一旦队列为空,则进入阻塞状态,等待新任务,一旦有新任务产生调用put()方法,则阻塞解除,程序继续执行。我们把队列设计为静态全局变量。目前已知有两种可行的方案

    3 方案一

    业务场景一采用此方案,具体步骤如:
    (1)启动采用使用static {},创建后台任务监控任务(一个线程)。
    (2)多线程使用scheduledThreadPool来调度执行监控任务,监控任务一直运行在后台。
    (3)监控任务使用无限循环结合LinkedBlockingQueue队列

    以下是监控pos打印失败,失败任务重新打印的例子:

        private static ScheduledExecutorService scheduledThreadPool = Executors
               .newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
         private static boolean ERROR_MONITOR_THREAD_INITED = false;
         static {
           if (!ERROR_MONITOR_THREAD_INITED) {
               scheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
                   @Override
                   public void run() {
                       PrintTask printTask;
                       LOGGER.debug("=============Start a error print job process thread!~===========");
                       try {
                           while (true) {
                             //如果队列为空,则阻塞,就一直等待有数据进来(挂起)
                               printTask = errorJobQueue.take();
                               PrintTaskManager.doPrint(printTask);
                           }
                       } catch (Exception e) {
                       }
                   }
               }
               , 0, 5, TimeUnit.SECONDS);
               ERROR_MONITOR_THREAD_INITED = true;
           }
       }
    
    • 在载入打印任务管理类时,就启动实时监控---static{}。
    • 启动后每隔5秒调度执行一个线程,线程任务就是把失败的打印任务重打,用while不停循环从失败任务队列获取任务重打。也就是说一直有一个线程在执行重打错误打印任务 ,为了防止重打任务异常导致整个监控线程结束,因此才有了每隔5秒再次启动监控重打错误任务。
    • 所有的错误任务放在一个队列中

    4.方案二

    业务场景一采用此方案,具体步骤如:
    (1)项目启动后开启监听程序,通过实现ApplicationRunner接口来实现,ApplicationRunner具体用法参照我另外一篇文章
    (2)创建监听任务,可以有多个,比如退款通知refundNotify,开票通知invoiceNotify、失败后从新发起的通知timingRetryNotify任务等,每监听任务就是一个线程,然后再启用线程池调度执行各个监听任务,监听任务启动后一直运行在后台

        @Override
        public void run(ApplicationArguments args)
        {
            String url01 = url + "/erp2ld_refund_status";
            String url02 = url + "/erp2ld_inv_status";
            RefundNotify refundNotify = new RefundNotify(refundQueue, mRefundNotifyMapper, url01);
            InvoiceNotify invoiceNotify = new InvoiceNotify(invoiceQueue, mApplicationNotifyMapper, url02);
            TimingRetryNotify timingRetryNotify = new TimingRetryNotify(timingRetryQueue, mRefundNotifyMapper, mApplicationFormMapper,
                    mApplicationNotifyMapper, url01, url02);
            //获取系统处理器个数,作为线程池数量
            int nThreads = Runtime.getRuntime().availableProcessors();
            ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                    .setNameFormat("member-pool-%d").build();
            //Common Thread Pool
            ExecutorService pool = new ThreadPoolExecutor(5, 200,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
            pool.execute(refundNotify);
            pool.execute(invoiceNotify);
            pool.execute(timingRetryNotify);
        }
    

    (3)监听任务是一个线程,使用无限循环结合LinkedBlockingQueue队列
    如果有新任务产生,任务标识放入队列即可,挂起的线程继续执行任务。

        @SneakyThrows
        @Override
        public void run()
        {
            while (true)
            {
                List<com.andy.modules.erp.member.pojo.RefundNotify> list = mRefundNotifyMapper.selectToNotify();
                if (list.size() > 0)
                {
                    for (com.andy.modules.erp.member.pojo.RefundNotify mRefundNotify : list)
                    {
                        RetryTemplate retryTemplate = new RetryTemplate();
                        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
                        backOffPolicy.setInitialInterval(3000);
                        backOffPolicy.setMultiplier(2);
                        backOffPolicy.setMaxInterval(15000);
                        retryTemplate.setBackOffPolicy(backOffPolicy);
                        retryTemplate.execute(
                                (RetryCallback<Object, Throwable>) retryContext -> {
                                    refundNotify(mRefundNotify);
                                    return "通知成功";
                                },
                                retryContext -> "通知失败");
                    }
                }
              // 如果没有通知,则阻塞
                logger.info(queue.take());
            }
        }
    

    (4)如果任务执行失败,比如退款通知失败(调用外部系统),无法收到触发任务,因此需要定时任务每隔一段时间把退款通知失败的任务重新发起。

    @Component("MNotifyTask")
    public class NotifyTask implements ITask {
        private static Logger logger = LoggerFactory.getLogger(AutoRepayServiceImpl.class);
        @Override
        public void run(String params) throws InterruptedException {
            //定时推送数据到LD
            logger.info("定时推送数据到LD");
            StartService.timingRetryQueue.put("定时推送数据到LD");
        }
    }
    

    相关文章

      网友评论

          本文标题:【方案设计】任务监控

          本文链接:https://www.haomeiwen.com/subject/ihmmoltx.html