一、背景
xxl-job作为一款开源软件,好处是开源,允许我们做二次开发;不足的地方当然也很多,下面会一一表述。比如它的可观测性、数据的治理等,直接拿来运行都是没有的。另外我们在实际运行过程中,尤其是开发和测试环境,遇到过服务无法注册到调度器、需核实任务是否按期执行了等各种需求。
二、迭代计划
- 接入pinpoint链路监控(已完成)
- 支持LDAP实现统一账户认证
- 接入sentry异常监控
- 应用监控
- 支持prometheus
- 线程池监控
- 日志治理
- xxl_job_log表的日志
- 应用执行的日志
- xxl-job-admin服务的日志
- 优化任务的执行
- http调用采用连接池技术
- 告警机制
- 告警方式增加实现企业微信和短信,原生的只支持邮箱一种方式。
- 支持灵活告警:根据服务名的不同标签,发送到具有该企业微信标签的人。
- 注册方式
- 去掉手动注册方式,限制只能在程序启动的时候,由程序注册到xxl-job-admin。
三、日志治理
1、表xxl_job_log
- xxl_job_log 采用blackhole引擎
- xxl_job_log 只保留几天的数据,定期delete数据
- xxl_job_log 定期truncate和drop/create。
2、应用执行的日志
com.xxl.job.core.thread.JobLogFileCleanThread
支持可配置:
xxl.job.executor.logretentiondays = 1 # 修改为可支持1天,源码是3天
if ((todayDate.getTime() - logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000)) {
FileUtil.deleteRecursively(childFile);
}
3、xxl-job-admin服务的日志
修改它的日志格式,和其他的业务服务的格式保持一致。
关于发布部署:让xxl-job-admin和其他的java服务一样的发布流程。唯一不同的是,该服务的配置和服务,都没有和公司的注册中心打通。
后期可以考虑将xxl-job也注册到consul注册中心,这样xxl-job-admin就不用手动创建执行器了。
4、优化任务的执行
见类:com.xxl.job.core.util.XxlJobRemotingUtil
源码采用的是jdk的java.net.HttpURLConnection,每次都创建连接和关闭连接,并没有采用池化技术。
建议修改为httpclient/okhttp等框架,利用池化技术,减少连接的开销。
5、重试次数
针对每天跑一次的定时任务,建议均配置重试次数大于0。
6、线程池的监控
xxl-job使用了大量的线程池以及手动新建线程,官方也有意向把线程修改为协程,以期提高程序的性能。
下面先罗列出来,后面根据它的优先级,进行重点的监控。(主要是看是否有工作任务的积压)
手动new 线程的方式:
- JobFailMonitorHelper Job调度失败的告警
- JobLogReportHelper 刷新Job调度的报表
- JobLosedMonitorHelper job调度超时的处理(最终被认为失败)
- JobRegistryMonitorHelper 刷新job的执行器列表
- JobScheduleHelper job的触发调度(scheduleThread和ringThread)
================================================= - JobLogFileCleanThread 删除Job运行的日志文件
- ExecutorRegistryThread 服务节点对执行器的注册和下线
- JobThread job的实际运行(每次的执行,都是一个新线程,做到线程隔离)
- TriggerCallbackThread 任务运行的回调和重试的回调(triggerCallbackThread和triggerRetryCallbackThread)
线程池的方式:注意每个线程池的配置参数。
- fastTriggerPool Job的触发耗时未超过500ms
- slowTriggerPool Job的触发耗时500ms及以上
他们都使用了阻塞队列LinkedBlockingQueue,核心线程数是10,最大线程数支持配置,
// 毫秒 转 分
private volatile long minTim = System.currentTimeMillis() / 60000; // ms > min
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
// choose thread pool
ExecutorService triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
// 根据任务ID,取得它的慢次数。注意它的类型是AtomicInteger。
// 如果大于10次,下次对该任务的调度将交由慢线程池。
if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// 触发Job
// do trigger
// check timeout-count-map
// job执行后和执行前不是同一个分钟值,则清空jobTimeoutCountMap
long minTim_now = System.currentTimeMillis() / 60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis() - start;
if (cost > 500) { // ob-timeout threshold 500ms
// timeoutCount 是一个对象!!!
// 所以后面的方法incrementAndGet()是会更新jobTimeoutCountMap里的value。
// key是任务ID,value的初始化值为1。
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
总结:快慢线程池有必要监控是否有积压,以便确定任务是否能够及时调度。
网友评论