1、启动类注入线程
@Bean(value = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(20);
scheduler.setThreadNamePrefix("task-");
//用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,
scheduler.setWaitForTasksToCompleteOnShutdown(true);
//线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return scheduler;
}
2、定时任务
private Map<Long, ReporterCheck> lastMap = new HashMap<>();
private Map<Long, ScheduledFuture> futureMap = new ConcurrentHashMap<>();
@Scheduled(cron = "0 0/5 * * * ?")
public void dynamic() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = (ThreadPoolTaskScheduler) taskScheduler;
//切换数据源
DataBaseTypeEnum.changeDataBaseTypeEnum(DataBaseTypeEnum.REPORTER.getDesc());
List<ReporterCheck> reporterChecks = reportCheckMapper.selectAll();
Map<Long, ReporterCheck> currReport = new HashMap<>();
//循环当前报表任务
reporterChecks.forEach(reporterCheck -> {
Long rid = reporterCheck.getId();
currReport.put(rid, reporterCheck);
if (!lastMap.containsKey(rid)) {
//新增,任务
ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(new Task(rid),
new CronTrigger(reporterCheck.getCorn()));
futureMap.put(rid, scheduledFuture);
}
});
//循环上次的报表任务
for (Long key : lastMap.keySet()) {
if (currReport.containsKey(key)
&& !currReport.get(key).getCorn().equals(lastMap.get(key).getCorn())) {
//cron表达式有变动,则更新
futureMap.get(key).cancel(false);
ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(new Task(key),
new CronTrigger(currReport.get(key).getCorn()));
futureMap.put(key, scheduledFuture);
} else if (!currReport.containsKey(key)) {
//删除任务
futureMap.get(key).cancel(true);
futureMap.remove(key);
}
}
//重新赋值上次任务列表
lastMap = currReport;
}
class Task implements Runnable {
private Long id;
public Task(Long id) {
this.id = id;
}
@Override
public void run() {
try {
logger.info("执行任务{}开始", id);
reportCheckService.handleReport(id);
logger.info("执行任务{}结束", id);
} catch (Exception e) {
logger.error("执行报表任务ID=" + id + "异常", e.getMessage());
}
}
}
3、逻辑执行
public void handleReport(Long id) {
//切换数据源
DataBaseTypeEnum.changeDataBaseTypeEnum(DataBaseTypeEnum.REPORTER.getDesc());
ReporterCheck reporterCheck = reporterCheckMapper.selectByPrimaryKey(id);
//组件:ModuleType=1 sql:ModuleType=2
if (reporterCheck.getModuleType().equals(1)) {
//API
Map<String, Object> map = (Map<String, Object>) produceIndicatorData(reporterCheck);
if (!map.isEmpty()) {
//SendMail
sendService.sendMail(reporterCheck.getTopic(), reporterCheck.getContent() + ":\r\n" + formatPut(map), null, reporterCheck.getToUcid());
}
} else {
//SQL params
List<Map<String, Object>> res = getResultFromSQL(reporterCheck);
List<Map<String, Object>> contentList = new ArrayList<>();
Map<String, String> expectedMap = new HashMap<>();
String[] expectArray = reporterCheck.getExpectRes().split(";");
for (String item : expectArray) {
String[] keyValue = item.split(":");
if (keyValue.length == 2) {
expectedMap.put(keyValue[0], keyValue[1]);
}
}
//没有指定期望
if (expectedMap.isEmpty()) {
contentList.addAll(res);
} else {
boolean flag = false;
for (Map<String, Object> item : res) {
for (String key : expectedMap.keySet()) {
if (item.get(key) == null || !expectedMap.get(key).equals(item.get(key).toString())) {
flag = true;
break;
}
}
if (flag) {
contentList.add(item);
}
flag = false;
}
}
//求优雅输出
System.out.println();
if (!contentList.isEmpty()) {
//SendMail
sendService.sendMail(reporterCheck.getTopic(), reporterCheck.getContent() + ":\r\n" + formatPut(contentList), null, reporterCheck.getToUcid());
}
}
}
private List<Map<String, Object>> getResultFromSQL(ReporterCheck reporterCheck) {
log.info("topic:{}", reporterCheck.getTopic());
//切换数据源
DataBaseTypeEnum.changeDataBaseTypeEnum(reporterCheck.getDataSource());
List<Map<String, Object>> res = reporterCheckMapper.querySql(reporterCheck.getModule());
return res;
}
private String formatPut(Object contentList) {
String content = JSONObject.toJSONString(contentList);
ObjectMapper mapper = new ObjectMapper();
Object obj = null;
try {
obj = mapper.readValue(content, Object.class);
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj);
} catch (IOException e) {
log.info("内容输出格式化失败:{}", e.getMessage());
}
return "";
}
public Object produceIndicatorData(ReporterCheck reporterCheck) {
String[] api = reporterCheck.getModule().split("#");
try {
String className = SpringUtil.decapitalize(api[0].substring(api[0].lastIndexOf(".") + 1, api[0].length()));
Object obj = SpringUtil.getBean(className);
Class clazz = obj.getClass();
Method method = clazz.getDeclaredMethod(api[1], ReporterCheck.class, Map.class);
Map<String, Object> map = new HashMap<>();
String[] params = reporterCheck.getParams().split(";");
for (String item : params) {
String[] keyValue = item.split(":");
if (keyValue.length == 2) {
map.put(keyValue[0], keyValue[1]);
}
}
return method.invoke(obj, reporterCheck, map);
} catch (NoSuchMethodException e) {
log.info("获取指定方式失败:{}", e.getMessage());
} catch (IllegalAccessException e) {
log.info("反射调用方法参数错误:{}", e.getMessage());
} catch (InvocationTargetException e) {
log.info("反射调用方法InvocationTargetException错误:{}", e);
}
return null;
}
网友评论