,如...">
美文网首页
多线程异步执行定时任务

多线程异步执行定时任务

作者: 海上旭日 | 来源:发表于2021-09-01 16:27 被阅读0次

    核心关键点:
    @Async("npmExecutor") 异步调用的返回结果只能是void或者Future<T>,如果执行Future.get()方法,等待异步调用结果,则主线程会阻塞,知道拿到结果后,才会继续执行

    @Async源码注释中可以看到具体用法解释:
    public @interface Async {
    
        /**
         * A qualifier value for the specified asynchronous operation(s).
              指定为自己定义的线程池npmExecutor,否则默认最大线程数不是自己设定的50个
         * <p>May be used to determine the target executor to be used when executing
         * the asynchronous operation(s), matching the qualifier value (or the bean
         * name) of a specific {@link java.util.concurrent.Executor Executor} or
         * {@link org.springframework.core.task.TaskExecutor TaskExecutor}
         * bean definition.
         * <p>When specified on a class-level {@code @Async} annotation, indicates that the
         * given executor should be used for all methods within the class. Method-level use
         * of {@code Async#value} always overrides any value set at the class level.
         * @since 3.1.2
         */
        String value() default "";
    
    
    

    定时任务执行类

    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    import java.util.concurrent.Callable;
    import java.util.concurrent.Future;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Slf4j
    @Component
    @EnableScheduling
    public class ScheduledService {
    
    
        @Autowired
        @Qualifier("npmExecutor")
        private ThreadPoolExecutor executor;
        @Autowired
        private TestFuture testFuture;
        @Autowired
        private TestMulti testMulti;
    
    
        @Scheduled(cron = "*/3 * * * * ?")
        @Async("npmExecutor")
        public void testMulti() {
            log.info("testMulti 执行了={}",DateUtils.formatDate(new Date()));
            log.info("testMulti 激活线程数={},任务数={},已完成数={},队列大小={}",
                    executor.getActiveCount(),executor.getTaskCount(),executor.getCompletedTaskCount(),executor.getQueue().size());
            try {
                for (int i =0;i<2;i++){
                    Future<String> submit = executor.submit(testFuture);
                    log.info("执行结果={}",submit.get());
                    log.info("执行中 激活线程数={},任务数={},已完成数={}",executor.getActiveCount(),executor.getTaskCount(),executor.getCompletedTaskCount());
                }
            } catch (Exception e) {
                log.error("alarm 获取数据失败={}", e);
            }
        }
    }
    

    配置线程池

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    @Configuration
    public class NpmThreadPoolConfig {
    
    
        @Bean("npmExecutor")
        public ThreadPoolExecutor getExecutor() {
    
            ThreadPoolExecutor taskExecutor = new ThreadPoolExecutor(30, 50, 30L, TimeUnit.SECONDS,  new LinkedBlockingQueue(1));
            taskExecutor.setThreadFactory(new NamedThreadFactory("npm"));
            taskExecutor.setRejectedExecutionHandler(new MyRejectHandle());
            
            return taskExecutor;
        }
    }
    

    自定义拒绝策略

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    //@Configuration
    @Slf4j
    public class MyRejectHandle implements RejectedExecutionHandler {
    
    
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    
            log.info("you task is rejected." + executor.toString());
        }
    }
    任务类
    @Slf4j
    @Service
    public class TestFuture implements Callable<String> {
    
        AtomicInteger atomicInteger = new AtomicInteger(0);
    
    
        @Override
        public String call() throws Exception {
    
            Thread.sleep(30000);
            return "TestFuture:"+atomicInteger.incrementAndGet()+":"+Thread.currentThread().getName();
        }
    }
    

    相关文章

      网友评论

          本文标题:多线程异步执行定时任务

          本文链接:https://www.haomeiwen.com/subject/jyaxwltx.html