美文网首页java高级开发
SpringBoot之@Async异步调用

SpringBoot之@Async异步调用

作者: 老鼠AI大米_Java全栈 | 来源:发表于2021-08-02 16:17 被阅读0次

    场景说明,最近需要开发视频转码,前端上传视频到后端,保存视频,转码,切片,加密,水印,最后上传到云存储,这一系列操作需要很多时间,所以需要异步处理。

    环境配置

    利用 Spring Initializer 创建一个 gradle 项目 spring-boot-async-task,创建时添加相关依赖。
    在 Spring Boot 入口类上配置 @EnableAsync 注解开启异步处理。

    @SpringBootApplication
    @EnableAsync
    public class Application {
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    }
    

    创建任务抽象类 AbstractTask,并分别配置三个任务方法 doTaskOne(),doTaskTwo(),doTaskThree()。

    public abstract class AbstractTask {
        private static Random random = new Random();
    
        public void doTaskOne() throws Exception {
            out.println("开始做任务一");
            long start = currentTimeMillis();
            sleep(random.nextInt(10000));
            long end = currentTimeMillis();
            out.println("完成任务一,耗时:" + (end - start) + "毫秒");
        }
    
        public void doTaskTwo() throws Exception {
            out.println("开始做任务二");
            long start = currentTimeMillis();
            sleep(random.nextInt(10000));
            long end = currentTimeMillis();
            out.println("完成任务二,耗时:" + (end - start) + "毫秒");
        }
    
        public void doTaskThree() throws Exception {
            out.println("开始做任务三");
            long start = currentTimeMillis();
            sleep(random.nextInt(10000));
            long end = currentTimeMillis();
            out.println("完成任务三,耗时:" + (end - start) + "毫秒");
        }
    }
    

    同步调用

    下面通过一个简单示例来直观的理解什么是同步调用:
    定义 Task 类,继承 AbstractTask,三个处理函数分别模拟三个执行任务的操作,操作消耗时间随机取(10 秒内)。

    @Component
    public class Task extends AbstractTask {
    }
    

    在 单元测试 用例中,注入 Task 对象,并在测试用例中执行 doTaskOne(),doTaskTwo(),doTaskThree() 三个方法。

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class TaskTest {
        @Autowired
        private Task task;
    
        @Test
        public void testSyncTasks() throws Exception {
            task.doTaskOne();
            task.doTaskTwo();
            task.doTaskThree();
        }
    }
    

    执行单元测试,可以看到类似如下输出:

    开始做任务一
    完成任务一,耗时:4059毫秒
    开始做任务二
    完成任务二,耗时:6316毫秒
    开始做任务三
    完成任务三,耗时:1973毫秒
    

    任务一、任务二、任务三顺序的执行完了,换言之 doTaskOne(),doTaskTwo(),doTaskThree() 三个方法顺序的执行完成。

    异步调用

    上述的可以看到 执行时间比较长,若这三个任务本身之间 不存在依赖关系,可以 并发执行 的话,同步调用在 执行效率 方面就比较差,可以考虑通过 异步调用 的方式来 并发执行。

    创建 AsyncTask类,分别在方法上配置 @Async 注解,将原来的 同步方法 变为 异步方法。

    @Component
    public class AsyncTask extends AbstractTask {
        @Async
        public void doTaskOne() throws Exception {
            super.doTaskOne();
        }
    
        @Async
        public void doTaskTwo() throws Exception {
            super.doTaskTwo();
        }
    
        @Async
        public void doTaskThree() throws Exception {
            super.doTaskThree();
        }
    }
    

    在 单元测试 用例中,注入 AsyncTask 对象,并在测试用例中执行 doTaskOne(),doTaskTwo(),doTaskThree() 三个方法。

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class AsyncTaskTest {
        @Autowired
        private AsyncTask task;
    
        @Test
        public void testAsyncTasks() throws Exception {
            task.doTaskOne();
            task.doTaskTwo();
            task.doTaskThree();
        }
    }
    

    执行单元测试,可以看到类似如下输出:

    开始做任务三
    开始做任务一
    开始做任务二

    如果反复执行单元测试,可能会遇到各种不同的结果,比如:

    • 没有任何任务相关的输出
    • 有部分任务相关的输出
    • 乱序的任务相关的输出

    原因是目前 doTaskOne(),doTaskTwo(),doTaskThree() 这三个方法已经 异步执行 了。主程序在 异步调用 之后,主程序并不会理会这三个函数是否执行完成了,由于没有其他需要执行的内容,所以程序就 自动结束 了,导致了 不完整 或是 没有输出任务 相关内容的情况。

    注意:@Async所修饰的函数不要定义为static类型,这样异步调用不会生效。
    调用与被修饰方法不能写在同一个函数中。

    什么时候需要异步(@Async)

    根据业务需求,可以将暂时不需要立即获得处理的方法设置为@Async.

    比如用户在前端点击完成了登录操作,这时候根据业务要求需要在登录成功之后进行埋点的处理.

    其实埋点成功与否都不影响用户操作,这时候就可以将埋点方法设置为@Async.

    个人认为此类任务通常有三个特征:

    • 业务优先级低.
    • 运行时间长,可能会造成卡顿.
    • 返回结果暂时不立即处理,包括exception.

    异步回调(并发)

    为了让 doTaskOne(),doTaskTwo(),doTaskThree() 能正常结束,假设我们需要统计一下三个任务 并发执行 共耗时多少,这就需要等到上述三个函数都完成动用之后记录时间,并计算结果。

    那么我们如何判断上述三个 异步调用 是否已经执行完成呢?我们需要使用 Future<T> 来返回 异步调用 的 结果。

    创建 AsyncCallBackTask 类,声明 doTaskOneCallback(),doTaskTwoCallback(),doTaskThreeCallback() 三个方法,对原有的三个方法进行包装。
    
    @Component
    public class AsyncCallBackTask extends AbstractTask {
        @Async
        public Future<String> doTaskOneCallback() throws Exception {
            super.doTaskOne();
            return new AsyncResult<>("任务一完成");
        }
    
        @Async
        public Future<String> doTaskTwoCallback() throws Exception {
            super.doTaskTwo();
            return new AsyncResult<>("任务二完成");
        }
    
        @Async
        public Future<String> doTaskThreeCallback() throws Exception {
            super.doTaskThree();
            return new AsyncResult<>("任务三完成");
        }
    }
    

    在 单元测试 用例中,注入 AsyncCallBackTask 对象,并在测试用例中执行 doTaskOneCallback(),doTaskTwoCallback(),doTaskThreeCallback() 三个方法。循环调用 Future 的 isDone() 方法等待三个 并发任务 执行完成,记录最终执行时间。

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class AsyncCallBackTaskTest {
        @Autowired
        private AsyncCallBackTask task;
    
        @Test
        public void testAsyncCallbackTask() throws Exception {
            long start = currentTimeMillis();
            Future<String> task1 = task.doTaskOneCallback();
            Future<String> task2 = task.doTaskTwoCallback();
            Future<String> task3 = task.doTaskThreeCallback();
    
            // 三个任务并发调用完成,退出循环等待
            while (!task1.isDone() || !task2.isDone() || !task3.isDone()) {
                sleep(1000);
            }
    
            long end = currentTimeMillis();
            out.println("任务全部完成,总耗时:" + (end - start) + "毫秒");
        }
    }
    

    在测试用例一开始记录开始时间;在调用三个异步函数的时候,返回Future类型的结果对象;在调用完三个异步函数之后,开启一个循环,根据返回的Future对象来判断三个异步函数是否都结束了。若都结束,就结束循环;若没有都结束,就等1秒后再判断。跳出循环之后,根据结束时间 - 开始时间,计算出三个任务并发执行的总耗时。

    执行一下上述的单元测试,可以看到如下结果:

    开始做任务一
    开始做任务三
    开始做任务二
    完成任务二,耗时:4882毫秒
    完成任务三,耗时:6484毫秒
    完成任务一,耗时:8748毫秒
    任务全部完成,总耗时:9043毫秒
    

    可以看到,通过 异步调用,让任务一、任务二、任务三 并发执行,有效的 减少 了程序的 运行总时间。

    定义线程池

    在上述操作中,创建一个 线程池配置类 TaskConfiguration ,并配置一个 任务线程池对象 taskExecutor。

    @Configuration
    public class TaskConfiguration {
        @Bean("taskExecutor")
        public Executor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(20);
            executor.setQueueCapacity(200);
            executor.setKeepAliveSeconds(60);
            executor.setThreadNamePrefix("taskExecutor-");
            executor.setRejectedExecutionHandler(new CallerRunsPolicy());
            return executor;
        }
    }
    

    上面我们通过使用 ThreadPoolTaskExecutor 创建了一个 线程池,同时设置了以下这些参数:

    线程池属性 属性的作用 设置初始值
    核心线程数 线程池创建时候初始化的线程数 10
    最大线程数 线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程 20
    缓冲队列 用来缓冲执行任务的队列 200
    允许线程的空闲时间 当超过了核心线程之外的线程,在空闲时间到达之后会被销毁 60秒
    线程池名的前缀 可以用于定位处理任务所在的线程池 taskExecutor-
    线程池对拒绝任务的处理策略 这里采用CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务 CallerRunsPolicy

    创建 AsyncExecutorTask类,三个任务的配置和 AsyncTask 一样,不同的是 @Async 注解需要指定前面配置的 线程池的名称 taskExecutor。

    @Component
    public class AsyncExecutorTask extends AbstractTask {
        @Async("taskExecutor")
        public void doTaskOne() throws Exception {
            super.doTaskOne();
            out.println("任务一,当前线程:" + currentThread().getName());
        }
    
        @Async("taskExecutor")
        public void doTaskTwo() throws Exception {
            super.doTaskTwo();
            out.println("任务二,当前线程:" + currentThread().getName());
        }
    
        @Async("taskExecutor")
        public void doTaskThree() throws Exception {
            super.doTaskThree();
            out.println("任务三,当前线程:" + currentThread().getName());
        }
    }
    

    在 单元测试 用例中,注入 AsyncExecutorTask 对象,并在测试用例中执行 doTaskOne(),doTaskTwo(),doTaskThree() 三个方法。

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class AsyncExecutorTaskTest {
        @Autowired
        private AsyncExecutorTask task;
    
        @Test
        public void testAsyncExecutorTask() throws Exception {
            task.doTaskOne();
            task.doTaskTwo();
            task.doTaskThree();
    
            sleep(30 * 1000L);
        }
    }
    

    执行一下上述的 单元测试,可以看到如下结果:

    开始做任务一
    开始做任务三
    开始做任务二
    完成任务二,耗时:3905毫秒
    任务二,当前线程:taskExecutor-2
    完成任务一,耗时:6184毫秒
    任务一,当前线程:taskExecutor-1
    完成任务三,耗时:9737毫秒
    任务三,当前线程:taskExecutor-3
    

    执行上面的单元测试,观察到 任务线程池 的 线程池名的前缀 被打印,说明 线程池 成功执行 异步任务!

    优雅地关闭线程池

    由于在应用关闭的时候异步任务还在执行,导致类似 数据库连接池 这样的对象一并被 销毁了,当 异步任务 中对 数据库 进行操作就会出错。

    解决方案如下,重新设置线程池配置对象,新增线程池 setWaitForTasksToCompleteOnShutdown() 和 setAwaitTerminationSeconds() 配置:

    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
        executor.setPoolSize(20);
        executor.setThreadNamePrefix("taskExecutor-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.setMaxPoolSize(20); // 线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程
        executor.setQueueCapacity(200); // 缓冲任务队列的大小
        executor.setKeepAliveSeconds(60); // 允许线程的空闲时间,超过会被销毁
        return executor;
    }
    
    • setWaitForTasksToCompleteOnShutdown(true): 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean,这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。
    • setAwaitTerminationSeconds(60): 该方法用来设置线程池中 任务的等待时间,如果超过这个时间还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。

    相关文章

      网友评论

        本文标题:SpringBoot之@Async异步调用

        本文链接:https://www.haomeiwen.com/subject/ecxjvltx.html