个人觉得线程模型在任何技术框架领域都是需要提及的一个点,我们来看下 Elastic job 怎样维护job 的线程池的
createJobDetail 方法 返回一个job 的JobDetail 给 quartz 调用, LiteJob类其实就是job的调度入口类,quartz每次调度会产生一个LiteJob类实例,调度完成后会被回收 ,Elastic job 默认 使用 SimpleThreadPool 一个线程调度 以及 MISFIRE_INSTRUCTION_DO_NOTHING 。
private JobDetail createJobDetail(final String jobClass) {
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
//生成LiteJob实例时 ,JobFacade 注入
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
if (elasticJobInstance.isPresent()) {
//生成LiteJob实例时 ,ElasticJob 注入
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
try {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
}
}
return result;
}
LiteJob 每次调度会执行execute 方法 实际调用的是 JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
public final class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
JobExecutorFactory 根据 elasticJob 类别 创建不同Executor ,比如SimpleJobExecutor 。
线程模型直接看DefaultExecutorServiceHandler createExecutorService 方法中返回的ExecutorServiceObject
/**
* 默认线程池服务处理器.
*
* @author zhangliang
*/
public final class DefaultExecutorServiceHandler implements ExecutorServiceHandler {
@Override
public ExecutorService createExecutorService(final String jobName) {
return new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2).createExecutorService();
}
}
**
* 线程池执行服务对象.
*
* @author zhangliang
*/
public final class ExecutorServiceObject {
private final ThreadPoolExecutor threadPoolExecutor;
private final BlockingQueue<Runnable> workQueue;
public ExecutorServiceObject(final String namingPattern, final int threadSize) {
workQueue = new LinkedBlockingQueue<>();
threadPoolExecutor = new ThreadPoolExecutor(threadSize, threadSize, 5L, TimeUnit.MINUTES, workQueue,
new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build());
threadPoolExecutor.allowCoreThreadTimeOut(true);
}
/**
* 创建线程池服务对象.
*
* @return 线程池服务对象
*/
public ExecutorService createExecutorService() {
return MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(threadPoolExecutor));
}
public boolean isShutdown() {
return threadPoolExecutor.isShutdown();
}
/**
* 获取当前活跃的线程数.
*
* @return 当前活跃的线程数
*/
public int getActiveThreadCount() {
return threadPoolExecutor.getActiveCount();
}
/**
* 获取待执行任务数量.
*
* @return 待执行任务数量
*/
public int getWorkQueueSize() {
return workQueue.size();
}
}
这里会根据jobname 创建线程namingPattern , 当前处理器数*2 的数量 core threads ,max threads , 5分钟idle 回收时间 。workQueue 为LinkedBlockingQueue
new ThreadPoolExecutor(threadSize, threadSize, 5L, TimeUnit.MINUTES, workQueue,
new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build());
threadPoolExecutor.allowCoreThreadTimeOut(true);
生成ExecutorServiceObject会以 jobname ExecutorServiceObject 映射方式 缓冲到一个hashmap中
简单说 一个 job 一个上例线程池
一个jvm实例 有多个 job , 每个job 在该实例上分片数又大于处理器数*2 的数量 ,随着job不断增加 , 单个job任务执行时间可能会变长 ,有可能超过平时的任务完成超时时间 ,造成任务失败 ,需要重点注意
我们思考下 如果一台机器 处理器数 2 , 线程池 就是 4 , 如果 分片是 5 , 就是说 一个分片会被排队 ,实际完成时间 >2 个分片 完成时间
网友评论