美文网首页
聊聊TaskExecutorMetricsAutoConfigu

聊聊TaskExecutorMetricsAutoConfigu

作者: go4it | 来源:发表于2023-08-20 09:21 被阅读0次

springboot2.6.0版本提供了TaskExecutorMetricsAutoConfiguration,可以自动给线程池加上metrics

TaskExecutorMetricsAutoConfiguration

spring-boot-actuator-autoconfigure-2.7.14-sources.jar!/org/springframework/boot/actuate/autoconfigure/metrics/task/TaskExecutorMetricsAutoConfiguration.java

/**
 * {@link EnableAutoConfiguration Auto-configuration} for metrics on all available
 * {@link ThreadPoolTaskExecutor task executors} and {@link ThreadPoolTaskScheduler task
 * schedulers}.
 *
 * @author Stephane Nicoll
 * @author Scott Frederick
 * @since 2.6.0
 */
@AutoConfiguration(after = { MetricsAutoConfiguration.class, SimpleMetricsExportAutoConfiguration.class,
        TaskExecutionAutoConfiguration.class, TaskSchedulingAutoConfiguration.class })
@ConditionalOnClass(ExecutorServiceMetrics.class)
@ConditionalOnBean({ Executor.class, MeterRegistry.class })
public class TaskExecutorMetricsAutoConfiguration {

    @Autowired
    public void bindTaskExecutorsToRegistry(Map<String, Executor> executors, MeterRegistry registry) {
        executors.forEach((beanName, executor) -> {
            if (executor instanceof ThreadPoolTaskExecutor) {
                monitor(registry, safeGetThreadPoolExecutor((ThreadPoolTaskExecutor) executor), beanName);
            }
            else if (executor instanceof ThreadPoolTaskScheduler) {
                monitor(registry, safeGetThreadPoolExecutor((ThreadPoolTaskScheduler) executor), beanName);
            }
        });
    }

    private void monitor(MeterRegistry registry, ThreadPoolExecutor threadPoolExecutor, String name) {
        if (threadPoolExecutor != null) {
            new ExecutorServiceMetrics(threadPoolExecutor, name, Collections.emptyList()).bindTo(registry);
        }
    }

    private ThreadPoolExecutor safeGetThreadPoolExecutor(ThreadPoolTaskExecutor taskExecutor) {
        try {
            return taskExecutor.getThreadPoolExecutor();
        }
        catch (IllegalStateException ex) {
            return null;
        }
    }

    private ThreadPoolExecutor safeGetThreadPoolExecutor(ThreadPoolTaskScheduler taskScheduler) {
        try {
            return taskScheduler.getScheduledThreadPoolExecutor();
        }
        catch (IllegalStateException ex) {
            return null;
        }
    }

}

这里会遍历executors,然后挨个执行monitor方法,而monitor方法则是创建ExecutorServiceMetrics然后绑定到meterRegistry

ExecutorServiceMetrics

micrometer-core-1.9.13-sources.jar!/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java

@NonNullApi
@NonNullFields
public class ExecutorServiceMetrics implements MeterBinder {

    private static boolean allowIllegalReflectiveAccess = true;

    private static final InternalLogger log = InternalLoggerFactory.getInstance(ExecutorServiceMetrics.class);

    private static final String DEFAULT_EXECUTOR_METRIC_PREFIX = "";

    @Nullable
    private final ExecutorService executorService;

    private final Iterable<Tag> tags;

    private final String metricPrefix;

    public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName,
            Iterable<Tag> tags) {
        this(executorService, executorServiceName, DEFAULT_EXECUTOR_METRIC_PREFIX, tags);
    }

    /**
     * Create an {@code ExecutorServiceMetrics} instance.
     * @param executorService executor service
     * @param executorServiceName executor service name which will be used as
     * {@literal name} tag
     * @param metricPrefix metrics prefix which will be used to prefix metric name
     * @param tags additional tags
     * @since 1.5.0
     */
    public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName,
            String metricPrefix, Iterable<Tag> tags) {
        this.executorService = executorService;
        this.tags = Tags.concat(tags, "name", executorServiceName);
        this.metricPrefix = sanitizePrefix(metricPrefix);
    }


    @Override
    public void bindTo(MeterRegistry registry) {
        if (executorService == null) {
            return;
        }

        String className = executorService.getClass().getName();

        if (executorService instanceof ThreadPoolExecutor) {
            monitor(registry, (ThreadPoolExecutor) executorService);
        }
        else if (executorService instanceof ForkJoinPool) {
            monitor(registry, (ForkJoinPool) executorService);
        }
        else if (allowIllegalReflectiveAccess) {
            if (className.equals("java.util.concurrent.Executors$DelegatedScheduledExecutorService")) {
                monitor(registry, unwrapThreadPoolExecutor(executorService, executorService.getClass()));
            }
            else if (className.equals("java.util.concurrent.Executors$FinalizableDelegatedExecutorService")) {
                monitor(registry,
                        unwrapThreadPoolExecutor(executorService, executorService.getClass().getSuperclass()));
            }
            else {
                log.warn("Failed to bind as {} is unsupported.", className);
            }
        }
        else {
            log.warn("Failed to bind as {} is unsupported or reflective access is not allowed.", className);
        }
    }       

    // ...... 

}    

这里主要是bindTo方法,区分了ThreadPoolExecutor及ForkJoinPool

monitor ThreadPoolExecutor

    private void monitor(MeterRegistry registry, @Nullable ThreadPoolExecutor tp) {
        if (tp == null) {
            return;
        }

        FunctionCounter.builder(metricPrefix + "executor.completed", tp, ThreadPoolExecutor::getCompletedTaskCount)
            .tags(tags)
            .description("The approximate total number of tasks that have completed execution")
            .baseUnit(BaseUnits.TASKS)
            .register(registry);

        Gauge.builder(metricPrefix + "executor.active", tp, ThreadPoolExecutor::getActiveCount)
            .tags(tags)
            .description("The approximate number of threads that are actively executing tasks")
            .baseUnit(BaseUnits.THREADS)
            .register(registry);

        Gauge.builder(metricPrefix + "executor.queued", tp, tpRef -> tpRef.getQueue().size())
            .tags(tags)
            .description("The approximate number of tasks that are queued for execution")
            .baseUnit(BaseUnits.TASKS)
            .register(registry);

        Gauge.builder(metricPrefix + "executor.queue.remaining", tp, tpRef -> tpRef.getQueue().remainingCapacity())
            .tags(tags)
            .description("The number of additional elements that this queue can ideally accept without blocking")
            .baseUnit(BaseUnits.TASKS)
            .register(registry);

        Gauge.builder(metricPrefix + "executor.pool.size", tp, ThreadPoolExecutor::getPoolSize)
            .tags(tags)
            .description("The current number of threads in the pool")
            .baseUnit(BaseUnits.THREADS)
            .register(registry);

        Gauge.builder(metricPrefix + "executor.pool.core", tp, ThreadPoolExecutor::getCorePoolSize)
            .tags(tags)
            .description("The core number of threads for the pool")
            .baseUnit(BaseUnits.THREADS)
            .register(registry);

        Gauge.builder(metricPrefix + "executor.pool.max", tp, ThreadPoolExecutor::getMaximumPoolSize)
            .tags(tags)
            .description("The maximum allowed number of threads in the pool")
            .baseUnit(BaseUnits.THREADS)
            .register(registry);
    }

针对ThreadPoolExecutor主要是上报了executor.completed、executor.active、executor.queued、executor.queue.remaining、executor.pool.size、executor.pool.core、executor.pool.max

monitor ForkJoinPool

    private void monitor(MeterRegistry registry, ForkJoinPool fj) {
        FunctionCounter.builder(metricPrefix + "executor.steals", fj, ForkJoinPool::getStealCount)
            .tags(tags)
            .description("Estimate of the total number of tasks stolen from "
                    + "one thread's work queue by another. The reported value "
                    + "underestimates the actual total number of steals when the pool " + "is not quiescent")
            .register(registry);

        Gauge.builder(metricPrefix + "executor.queued", fj, ForkJoinPool::getQueuedTaskCount)
            .tags(tags)
            .description("An estimate of the total number of tasks currently held in queues by worker threads")
            .register(registry);

        Gauge.builder(metricPrefix + "executor.active", fj, ForkJoinPool::getActiveThreadCount)
            .tags(tags)
            .description("An estimate of the number of threads that are currently stealing or executing tasks")
            .register(registry);

        Gauge.builder(metricPrefix + "executor.running", fj, ForkJoinPool::getRunningThreadCount)
            .tags(tags)
            .description(
                    "An estimate of the number of worker threads that are not blocked waiting to join tasks or for other managed synchronization threads")
            .register(registry);
    }

针对ForkJoinPool主要是上报了executor.steals、executor.queued、executor.active、executor.running

小结

springboot 2.6.0版本提供了TaskExecutorMetricsAutoConfiguration,它利用micrometer的ExecutorServiceMetrics提供了对Executor的metrics上报。升级到新版本的服务就不用再手工给线程池进行指标上报了。

doc

相关文章

  • 聊聊…聊聊?

    世界不大,一座城市里,用高楼大厦圈出来的的圈子更小了… 心再大,也会被城市里喧嚣的汽笛压抑自己 不记得有多久没有好...

  • 聊聊聊

    今天主要的时间是和阿q过的,非常开心我们有了这么一次聊天! 我觉得自己不孤单了。我俩目前拥有的感情非常相似,是比较...

  • 无聊聊聊

  • 聊聊,聊聊选择

    今早梦到一杯豆浆15元,我给自己的孩子买了一杯50元的奶茶,对她感叹“在我们那个年代一杯奶茶才10元”孩子问我那么...

  • 聊聊,聊聊闲时

    有段时间着了迷一样的看伍迪艾伦电影,印象最深的就是电影开场他一张大脸挤满了屏幕,絮絮叨叨两分钟,正片开始。 后来得...

  • 37

    今晚不想你睡 想和你聊聊聊聊聊到天天天天天长地久

  • 那个我以为很酷的男生

    熄了 灯,朋友打电话来聊天,聊了好久好久… 聊聊过去,聊聊现在,聊聊未来,聊聊别人,也聊聊自己… 一晃我们过了二字...

  • 悠然自得——二舅家游记(下)

    我们一起聊聊工作,聊聊生活,聊聊城市,聊聊乡村,聊聊猪场,聊聊门前那条黑背。我不争气的扒在窗口,安安静静地看着它。...

  • 我一直不缺母爱20210106

    元旦,儿子放月假。 其实我儿子每次放假回家都会和我聊聊天,聊聊学校,聊聊老师,聊聊同学,聊聊他自己,没有着重点,都...

  • 聊聊人生 聊聊理想

    今天跟老朋友约着去省图书馆看了书。然后我又去借了两本书回来。觉得一天下来还挺充实的,挺喜欢这种生活的。可能我也比较...

网友评论

      本文标题:聊聊TaskExecutorMetricsAutoConfigu

      本文链接:https://www.haomeiwen.com/subject/ldjsmdtx.html