美文网首页
一个简单的异步回调实现

一个简单的异步回调实现

作者: M_lear | 来源:发表于2022-07-14 00:00 被阅读0次

    从Runnable到Callable,JDK虽然帮我们封装了异步结果的获取,但并没有为我们封装异步任务两种结果(正常执行或异常)的处理。

    虽然我们可以调用Future的get自行获取异步结果,并根据结果(成功或异常)做对应的处理逻辑。但比较呆的点在于Future的get方法在异步任务未完成前会阻塞调用线程。

    所以我们一般不这样操作。

    我们希望执行异步任务的线程,在异步任务完成后,能自动调用异步结果的处理逻辑,不需要我们另起线程等待异步任务执行完成,这便是常说的异步回调。

    虽然JDK没帮我们封装,但却贴心的为我们预留了异步回调的扩展点。这个点就是FutureTask类的done方法。

        /**
         * Protected method invoked when this task transitions to state
         * {@code isDone} (whether normally or via cancellation). The
         * default implementation does nothing.  Subclasses may override
         * this method to invoke completion callbacks or perform
         * bookkeeping. Note that you can query status inside the
         * implementation of this method to determine whether this task
         * has been cancelled.
         */
        protected void done() { }
    

    这个done方法会在异步任务执行完后调用,默认是一个空实现,我们可以在子类重写这个方法,并放上异步回调的逻辑。

    根据Callable任务的执行原理和JDK为我们预留的扩展点,我们可以方便的实现异步回调。
    关于Callable任务的执行原理可以参考:Java Callable任务

    异步回调的实现

    继承AbstractExecutorService,重写newTaskFor方法:

    import java.util.List;
    import java.util.concurrent.*;
    
    public class MyExecutorService extends AbstractExecutorService {
        // 执行异步任务的线程池
        private final ExecutorService taskExecutor;
        // 回调逻辑
        private final Callback callback;
        // 执行回调逻辑的线程池
        private final ExecutorService callbackExecutor;
    
        public MyExecutorService(ExecutorService taskExecutor, Callback callback, ExecutorService callbackExecutor) {
            this.taskExecutor = taskExecutor;
            this.callback = callback;
            this.callbackExecutor = callbackExecutor;
        }
    
    
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<>(callable) {
                // 继承FutureTask,并重写done方法
                @Override
                protected void done() {
                    T result;
                    try {
                        // 执行到这的时候,异步任务已经执行完了
                        // 这个get不会阻塞,仅用来获取异步结果
                        result = get();
                    } catch (Exception e) {
                        callbackExecutor.execute(() -> {
                            // 调用失败处理逻辑
                            callback.onFailure(e);
                        });
                        return;
                    }
                    callbackExecutor.execute(() -> {
                        // 调用成功处理逻辑
                        callback.onSuccess(result);
                    });
                }
            };
        }
    
        @Override
        public void shutdown() {
            taskExecutor.shutdown();
        }
    
        @Override
        public List<Runnable> shutdownNow() {
            return taskExecutor.shutdownNow();
        }
    
        @Override
        public boolean isShutdown() {
            return taskExecutor.isShutdown();
        }
    
        @Override
        public boolean isTerminated() {
            return taskExecutor.isTerminated();
        }
    
        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            return taskExecutor.awaitTermination(timeout, unit);
        }
    
        @Override
        public void execute(Runnable command) {
            taskExecutor.execute(command);
        }
    }
    

    执行到done方法里的get调用时,不会阻塞,因为异步任务的结果已经设置了,可以直接get到执行结果。

    定义回调接口:

    public interface Callback {
        <T> void onSuccess(T t);
        void onFailure(Exception e);
    }
    

    是不是挺简单的!

    使用示例

    模拟异步任务成功执行:

    import java.util.concurrent.*;
    
    public class JdkCallbackMechanism {
        public static void main(String[] args) throws InterruptedException {
            // 定义回调处理逻辑
            Callback callback = new Callback() {
                @Override
                public <T> void onSuccess(T t) {
                    System.out.println("任务成功执行了,结果是:" + t);
                }
    
                @Override
                public void onFailure(Exception e) {
                    System.out.println("任务执行失败了,发生了异常:" + e);
                }
            };
    
            // 定义处理异步任务和回调任务的线程池
            ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
            ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
    
            // 初始化MyExecutorService
            MyExecutorService myExecutorService = new MyExecutorService(taskExecutor, callback, callbackExecutor);
            // 定义异步任务
            Callable<Integer> callable = () -> {
                System.out.println("异步任务正在执行...");
                // 模拟耗时操作
                TimeUnit.SECONDS.sleep(15);
                return 1;
            };
    
            // 将异步任务提交给myExecutorService
            myExecutorService.submit(callable);
        }
    }
    

    执行结果:

    image.png

    模拟异步任务执行失败(用除0异常模拟):

    import java.util.concurrent.*;
    
    public class JdkCallbackMechanism {
        public static void main(String[] args) throws InterruptedException {
            // 定义回调处理逻辑
            Callback callback = new Callback() {
                @Override
                public <T> void onSuccess(T t) {
                    System.out.println("任务成功执行了,结果是:" + t);
                }
    
                @Override
                public void onFailure(Exception e) {
                    System.out.println("任务执行失败了,发生了异常:" + e);
                }
            };
    
            // 定义处理异步任务和回调任务的线程池
            ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
            ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
    
            // 初始化MyExecutorService
            MyExecutorService myExecutorService = new MyExecutorService(taskExecutor, callback, callbackExecutor);
            // 定义异步任务
            Callable<Integer> callable = () -> {
                System.out.println("异步任务正在执行...");
                // 模拟耗时操作
                TimeUnit.SECONDS.sleep(15);
                return 1/0;
            };
    
            // 将异步任务提交给myExecutorService
            myExecutorService.submit(callable);
        }
    }
    

    执行结果:

    image.png

    模拟取消异步任务:

    import java.util.concurrent.*;
    
    public class JdkCallbackMechanism {
        public static void main(String[] args) throws InterruptedException {
            // 定义回调处理逻辑
            Callback callback = new Callback() {
                @Override
                public <T> void onSuccess(T t) {
                    System.out.println("任务成功执行了,结果是:" + t);
                }
    
                @Override
                public void onFailure(Exception e) {
                    System.out.println("任务执行失败了,发生了异常:" + e);
                }
            };
    
            // 定义处理异步任务和回调任务的线程池
            ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
            ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
    
            // 初始化MyExecutorService
            MyExecutorService myExecutorService = new MyExecutorService(taskExecutor, callback, callbackExecutor);
            // 定义异步任务
            Callable<Integer> callable = () -> {
                System.out.println("异步任务正在执行...");
                // 模拟耗时操作
                TimeUnit.SECONDS.sleep(15);
                return 1/0;
            };
    
            // 将异步任务提交给myExecutorService
            Future<Integer> future = myExecutorService.submit(callable);
            TimeUnit.SECONDS.sleep(5);
            future.cancel(true);
        }
    }
    

    执行结果:

    image.png

    最后:工作中,其实已经有很多已经封装好的异步回调框架,不需要我们自己造轮子,比如google的guava。但本文几乎以最精简的代码实现了一个异步回调,有助于大家理解异步回调的原理。

    相关文章

      网友评论

          本文标题:一个简单的异步回调实现

          本文链接:https://www.haomeiwen.com/subject/jfkgbrtx.html