美文网首页
聊聊TaskExecutorMetricsAutoConfigu

聊聊TaskExecutorMetricsAutoConfigu

作者: go4it | 来源:发表于2023-08-20 09:21 被阅读0次

    springboot2.6.0版本提供了TaskExecutorMetricsAutoConfiguration,可以自动给线程池加上metrics

    TaskExecutorMetricsAutoConfiguration

    spring-boot-actuator-autoconfigure-2.7.14-sources.jar!/org/springframework/boot/actuate/autoconfigure/metrics/task/TaskExecutorMetricsAutoConfiguration.java

    /**
     * {@link EnableAutoConfiguration Auto-configuration} for metrics on all available
     * {@link ThreadPoolTaskExecutor task executors} and {@link ThreadPoolTaskScheduler task
     * schedulers}.
     *
     * @author Stephane Nicoll
     * @author Scott Frederick
     * @since 2.6.0
     */
    @AutoConfiguration(after = { MetricsAutoConfiguration.class, SimpleMetricsExportAutoConfiguration.class,
            TaskExecutionAutoConfiguration.class, TaskSchedulingAutoConfiguration.class })
    @ConditionalOnClass(ExecutorServiceMetrics.class)
    @ConditionalOnBean({ Executor.class, MeterRegistry.class })
    public class TaskExecutorMetricsAutoConfiguration {
    
        @Autowired
        public void bindTaskExecutorsToRegistry(Map<String, Executor> executors, MeterRegistry registry) {
            executors.forEach((beanName, executor) -> {
                if (executor instanceof ThreadPoolTaskExecutor) {
                    monitor(registry, safeGetThreadPoolExecutor((ThreadPoolTaskExecutor) executor), beanName);
                }
                else if (executor instanceof ThreadPoolTaskScheduler) {
                    monitor(registry, safeGetThreadPoolExecutor((ThreadPoolTaskScheduler) executor), beanName);
                }
            });
        }
    
        private void monitor(MeterRegistry registry, ThreadPoolExecutor threadPoolExecutor, String name) {
            if (threadPoolExecutor != null) {
                new ExecutorServiceMetrics(threadPoolExecutor, name, Collections.emptyList()).bindTo(registry);
            }
        }
    
        private ThreadPoolExecutor safeGetThreadPoolExecutor(ThreadPoolTaskExecutor taskExecutor) {
            try {
                return taskExecutor.getThreadPoolExecutor();
            }
            catch (IllegalStateException ex) {
                return null;
            }
        }
    
        private ThreadPoolExecutor safeGetThreadPoolExecutor(ThreadPoolTaskScheduler taskScheduler) {
            try {
                return taskScheduler.getScheduledThreadPoolExecutor();
            }
            catch (IllegalStateException ex) {
                return null;
            }
        }
    
    }
    

    这里会遍历executors,然后挨个执行monitor方法,而monitor方法则是创建ExecutorServiceMetrics然后绑定到meterRegistry

    ExecutorServiceMetrics

    micrometer-core-1.9.13-sources.jar!/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java

    @NonNullApi
    @NonNullFields
    public class ExecutorServiceMetrics implements MeterBinder {
    
        private static boolean allowIllegalReflectiveAccess = true;
    
        private static final InternalLogger log = InternalLoggerFactory.getInstance(ExecutorServiceMetrics.class);
    
        private static final String DEFAULT_EXECUTOR_METRIC_PREFIX = "";
    
        @Nullable
        private final ExecutorService executorService;
    
        private final Iterable<Tag> tags;
    
        private final String metricPrefix;
    
        public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName,
                Iterable<Tag> tags) {
            this(executorService, executorServiceName, DEFAULT_EXECUTOR_METRIC_PREFIX, tags);
        }
    
        /**
         * Create an {@code ExecutorServiceMetrics} instance.
         * @param executorService executor service
         * @param executorServiceName executor service name which will be used as
         * {@literal name} tag
         * @param metricPrefix metrics prefix which will be used to prefix metric name
         * @param tags additional tags
         * @since 1.5.0
         */
        public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName,
                String metricPrefix, Iterable<Tag> tags) {
            this.executorService = executorService;
            this.tags = Tags.concat(tags, "name", executorServiceName);
            this.metricPrefix = sanitizePrefix(metricPrefix);
        }
    
    
        @Override
        public void bindTo(MeterRegistry registry) {
            if (executorService == null) {
                return;
            }
    
            String className = executorService.getClass().getName();
    
            if (executorService instanceof ThreadPoolExecutor) {
                monitor(registry, (ThreadPoolExecutor) executorService);
            }
            else if (executorService instanceof ForkJoinPool) {
                monitor(registry, (ForkJoinPool) executorService);
            }
            else if (allowIllegalReflectiveAccess) {
                if (className.equals("java.util.concurrent.Executors$DelegatedScheduledExecutorService")) {
                    monitor(registry, unwrapThreadPoolExecutor(executorService, executorService.getClass()));
                }
                else if (className.equals("java.util.concurrent.Executors$FinalizableDelegatedExecutorService")) {
                    monitor(registry,
                            unwrapThreadPoolExecutor(executorService, executorService.getClass().getSuperclass()));
                }
                else {
                    log.warn("Failed to bind as {} is unsupported.", className);
                }
            }
            else {
                log.warn("Failed to bind as {} is unsupported or reflective access is not allowed.", className);
            }
        }       
    
        // ...... 
    
    }    
    

    这里主要是bindTo方法,区分了ThreadPoolExecutor及ForkJoinPool

    monitor ThreadPoolExecutor

        private void monitor(MeterRegistry registry, @Nullable ThreadPoolExecutor tp) {
            if (tp == null) {
                return;
            }
    
            FunctionCounter.builder(metricPrefix + "executor.completed", tp, ThreadPoolExecutor::getCompletedTaskCount)
                .tags(tags)
                .description("The approximate total number of tasks that have completed execution")
                .baseUnit(BaseUnits.TASKS)
                .register(registry);
    
            Gauge.builder(metricPrefix + "executor.active", tp, ThreadPoolExecutor::getActiveCount)
                .tags(tags)
                .description("The approximate number of threads that are actively executing tasks")
                .baseUnit(BaseUnits.THREADS)
                .register(registry);
    
            Gauge.builder(metricPrefix + "executor.queued", tp, tpRef -> tpRef.getQueue().size())
                .tags(tags)
                .description("The approximate number of tasks that are queued for execution")
                .baseUnit(BaseUnits.TASKS)
                .register(registry);
    
            Gauge.builder(metricPrefix + "executor.queue.remaining", tp, tpRef -> tpRef.getQueue().remainingCapacity())
                .tags(tags)
                .description("The number of additional elements that this queue can ideally accept without blocking")
                .baseUnit(BaseUnits.TASKS)
                .register(registry);
    
            Gauge.builder(metricPrefix + "executor.pool.size", tp, ThreadPoolExecutor::getPoolSize)
                .tags(tags)
                .description("The current number of threads in the pool")
                .baseUnit(BaseUnits.THREADS)
                .register(registry);
    
            Gauge.builder(metricPrefix + "executor.pool.core", tp, ThreadPoolExecutor::getCorePoolSize)
                .tags(tags)
                .description("The core number of threads for the pool")
                .baseUnit(BaseUnits.THREADS)
                .register(registry);
    
            Gauge.builder(metricPrefix + "executor.pool.max", tp, ThreadPoolExecutor::getMaximumPoolSize)
                .tags(tags)
                .description("The maximum allowed number of threads in the pool")
                .baseUnit(BaseUnits.THREADS)
                .register(registry);
        }
    

    针对ThreadPoolExecutor主要是上报了executor.completed、executor.active、executor.queued、executor.queue.remaining、executor.pool.size、executor.pool.core、executor.pool.max

    monitor ForkJoinPool

        private void monitor(MeterRegistry registry, ForkJoinPool fj) {
            FunctionCounter.builder(metricPrefix + "executor.steals", fj, ForkJoinPool::getStealCount)
                .tags(tags)
                .description("Estimate of the total number of tasks stolen from "
                        + "one thread's work queue by another. The reported value "
                        + "underestimates the actual total number of steals when the pool " + "is not quiescent")
                .register(registry);
    
            Gauge.builder(metricPrefix + "executor.queued", fj, ForkJoinPool::getQueuedTaskCount)
                .tags(tags)
                .description("An estimate of the total number of tasks currently held in queues by worker threads")
                .register(registry);
    
            Gauge.builder(metricPrefix + "executor.active", fj, ForkJoinPool::getActiveThreadCount)
                .tags(tags)
                .description("An estimate of the number of threads that are currently stealing or executing tasks")
                .register(registry);
    
            Gauge.builder(metricPrefix + "executor.running", fj, ForkJoinPool::getRunningThreadCount)
                .tags(tags)
                .description(
                        "An estimate of the number of worker threads that are not blocked waiting to join tasks or for other managed synchronization threads")
                .register(registry);
        }
    

    针对ForkJoinPool主要是上报了executor.steals、executor.queued、executor.active、executor.running

    小结

    springboot 2.6.0版本提供了TaskExecutorMetricsAutoConfiguration,它利用micrometer的ExecutorServiceMetrics提供了对Executor的metrics上报。升级到新版本的服务就不用再手工给线程池进行指标上报了。

    doc

    相关文章

      网友评论

          本文标题:聊聊TaskExecutorMetricsAutoConfigu

          本文链接:https://www.haomeiwen.com/subject/ldjsmdtx.html