美文网首页
java程序定时任务做数据报警

java程序定时任务做数据报警

作者: 酷酷的美猴王 | 来源:发表于2020-10-12 14:25 被阅读0次

1、启动类注入线程

@Bean(value = "taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(20);
        scheduler.setThreadNamePrefix("task-");
        //用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,
        scheduler.setWaitForTasksToCompleteOnShutdown(true);
        //线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
        scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return scheduler;
    }

2、定时任务

private Map<Long, ReporterCheck> lastMap = new HashMap<>();
    private Map<Long, ScheduledFuture> futureMap = new ConcurrentHashMap<>();

    @Scheduled(cron = "0 0/5 * * * ?")
    public void dynamic() {
            ThreadPoolTaskScheduler threadPoolTaskScheduler = (ThreadPoolTaskScheduler) taskScheduler;

            //切换数据源
            DataBaseTypeEnum.changeDataBaseTypeEnum(DataBaseTypeEnum.REPORTER.getDesc());

            List<ReporterCheck> reporterChecks = reportCheckMapper.selectAll();
            Map<Long, ReporterCheck> currReport = new HashMap<>();

            //循环当前报表任务
            reporterChecks.forEach(reporterCheck -> {
                Long rid = reporterCheck.getId();
                currReport.put(rid, reporterCheck);
                if (!lastMap.containsKey(rid)) {
                    //新增,任务
                    ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(new Task(rid),
                            new CronTrigger(reporterCheck.getCorn()));
                    futureMap.put(rid, scheduledFuture);
                }
            });

            //循环上次的报表任务
            for (Long key : lastMap.keySet()) {
                if (currReport.containsKey(key)
                        && !currReport.get(key).getCorn().equals(lastMap.get(key).getCorn())) {
                    //cron表达式有变动,则更新
                    futureMap.get(key).cancel(false);

                    ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(new Task(key),
                            new CronTrigger(currReport.get(key).getCorn()));
                    futureMap.put(key, scheduledFuture);

                } else if (!currReport.containsKey(key)) {
                    //删除任务
                    futureMap.get(key).cancel(true);
                    futureMap.remove(key);
                }
            }
            //重新赋值上次任务列表
            lastMap = currReport;
    }

class Task implements Runnable {
        private Long id;

        public Task(Long id) {
            this.id = id;
        }

        @Override
        public void run() {
            try {
                logger.info("执行任务{}开始", id);
                reportCheckService.handleReport(id);
                logger.info("执行任务{}结束", id);
            } catch (Exception e) {
                logger.error("执行报表任务ID=" + id + "异常", e.getMessage());
            }
        }
    }

3、逻辑执行

public void handleReport(Long id) {
        //切换数据源
        DataBaseTypeEnum.changeDataBaseTypeEnum(DataBaseTypeEnum.REPORTER.getDesc());
        ReporterCheck reporterCheck = reporterCheckMapper.selectByPrimaryKey(id);
        //组件:ModuleType=1 sql:ModuleType=2
        if (reporterCheck.getModuleType().equals(1)) {
            //API
            Map<String, Object> map = (Map<String, Object>) produceIndicatorData(reporterCheck);
            if (!map.isEmpty()) {
                //SendMail
                sendService.sendMail(reporterCheck.getTopic(), reporterCheck.getContent() + ":\r\n" + formatPut(map), null, reporterCheck.getToUcid());
            }
        } else {
            //SQL params
            List<Map<String, Object>> res = getResultFromSQL(reporterCheck);
            List<Map<String, Object>> contentList = new ArrayList<>();
            Map<String, String> expectedMap = new HashMap<>();
            String[] expectArray = reporterCheck.getExpectRes().split(";");
            for (String item : expectArray) {
                String[] keyValue = item.split(":");
                if (keyValue.length == 2) {
                    expectedMap.put(keyValue[0], keyValue[1]);
                }
            }

            //没有指定期望
            if (expectedMap.isEmpty()) {
                contentList.addAll(res);
            } else {
                boolean flag = false;
                for (Map<String, Object> item : res) {
                    for (String key : expectedMap.keySet()) {
                        if (item.get(key) == null || !expectedMap.get(key).equals(item.get(key).toString())) {
                            flag = true;
                            break;
                        }
                    }
                    if (flag) {
                        contentList.add(item);
                    }
                    flag = false;
                }
            }

            //求优雅输出

            System.out.println();

            if (!contentList.isEmpty()) {
                //SendMail
                sendService.sendMail(reporterCheck.getTopic(), reporterCheck.getContent() + ":\r\n" + formatPut(contentList), null, reporterCheck.getToUcid());
            }
        }
    }

private List<Map<String, Object>> getResultFromSQL(ReporterCheck reporterCheck) {
        log.info("topic:{}", reporterCheck.getTopic());
        //切换数据源
        DataBaseTypeEnum.changeDataBaseTypeEnum(reporterCheck.getDataSource());
        List<Map<String, Object>> res = reporterCheckMapper.querySql(reporterCheck.getModule());
        return res;
    }

private String formatPut(Object contentList) {
        String content = JSONObject.toJSONString(contentList);
        ObjectMapper mapper = new ObjectMapper();
        Object obj = null;
        try {
            obj = mapper.readValue(content, Object.class);
            return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj);
        } catch (IOException e) {
            log.info("内容输出格式化失败:{}", e.getMessage());
        }
        return "";
    }

public Object produceIndicatorData(ReporterCheck reporterCheck) {
        String[] api = reporterCheck.getModule().split("#");
        try {
            String className = SpringUtil.decapitalize(api[0].substring(api[0].lastIndexOf(".") + 1, api[0].length()));

            Object obj = SpringUtil.getBean(className);

            Class clazz = obj.getClass();

            Method method = clazz.getDeclaredMethod(api[1], ReporterCheck.class, Map.class);

            Map<String, Object> map = new HashMap<>();
            String[] params = reporterCheck.getParams().split(";");
            for (String item : params) {
                String[] keyValue = item.split(":");
                if (keyValue.length == 2) {
                    map.put(keyValue[0], keyValue[1]);
                }
            }

            return method.invoke(obj, reporterCheck, map);

        } catch (NoSuchMethodException e) {
            log.info("获取指定方式失败:{}", e.getMessage());
        } catch (IllegalAccessException e) {
            log.info("反射调用方法参数错误:{}", e.getMessage());
        } catch (InvocationTargetException e) {
            log.info("反射调用方法InvocationTargetException错误:{}", e);
        }
        return null;
    }

相关文章

网友评论

      本文标题:java程序定时任务做数据报警

      本文链接:https://www.haomeiwen.com/subject/rzgupktx.html