美文网首页
线程池异常处理

线程池异常处理

作者: 茶还是咖啡 | 来源:发表于2021-07-07 22:19 被阅读0次

一个栗子

模拟线程池中任务在执行过程中发生异常。

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> {
            int i = 1 / 0;
        });

这段代码在执行中,肯定会报一个除0异常,但是我们不会收到任何错误的信息。原因是线程池会将执行过程中发生的异常信息存储起来,然后通过调用get方法时,如果有异常,会抛出一个被ExecutionException包裹的异常;具体原理可查看 https://juejin.cn/post/6961729520793550861 ,所以我们需要判断任务执行时,是否抛出了异常,可以通过try-catch代码块,捕获处理异常。

    @Test
    public void test() throws Exception {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        try{
            executorService.submit(() -> {
                int i = 1 / 0;
            }).get();
        }catch (ExecutionException e){
            e.printStackTrace();
        }
    }

这种方式虽然可以处理异常,但是如果调用了get方法,会阻塞当前线程,直到run方法执行完毕,这样一来就失去了异步的意义,显然是不行的。


针对这样的情况,我们可以将我们自己执行的任务进行包装,然后再提交给线程池处理,这样一来,任务执行报错时,可以直接走我们的提前准备的异常处理逻辑,这样,即可实现程序既是异步执行,任务执行出错,线程池也不会吃掉我们的异常。

大致意思如下:

    @Test
    public void test() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        // 需要提交给线程池的任务
        Runnable task = () -> {
            int i = 1 / 0;
        };

        // 包装任务,做异常处理
        Runnable taskWrapper = () -> {
            try {
                task.run();
            } catch (Exception e) {
                e.printStackTrace();
                // todo 异常处理
            }
        };

        executorService.submit(taskWrapper);
    }

控制台输出:

java.lang.ArithmeticException: / by zero
    at org.ywb.practise.difficulty.ExecutorMain.lambda$test$0(ExecutorMain.java:20)
    at org.ywb.practise.difficulty.ExecutorMain.lambda$test$1(ExecutorMain.java:26)

核心的处理逻辑就是这样,但是如果这样写代码,估计会被主管打死 :-(,为了保证程序的复用性,可以稍作改装~

  1. runnable异常处理
@FunctionalInterface
public interface RunnableErrorHandler {
    /**
     * runnable 异常处理
     *
     * @param throwable 异常
     */
    void errorHandler(Throwable throwable);
}
  1. 一个包装线程池的类
public class ExecutorServiceWrapper {

    private final ExecutorService threadPoolExecutor;

    private RunnableErrorHandler defaultRunnableErrHandler;

    public ExecutorServiceWrapper(ExecutorService threadPoolExecutor) {
        this.threadPoolExecutor = threadPoolExecutor;
    }

    public ExecutorServiceWrapper(ExecutorService threadPoolExecutor, RunnableErrorHandler defaultRunnableErrHandler) {
        this.threadPoolExecutor = threadPoolExecutor;
        this.defaultRunnableErrHandler = defaultRunnableErrHandler;
    }

    /**
     * 不传入异常处理机制,程序使用默认异常处理机制
     *
     * @param task 执行的任务
     * @return future<Void>
     */
    public Future<?> submit(Runnable task) {
        return threadPoolExecutor.submit(() -> {
            try {
                task.run();
            } catch (Throwable e) {
                if (defaultRunnableErrHandler != null) {
                    defaultRunnableErrHandler.errorHandler(e);
                }
            }
        });
    }

    /**
     * 自定义异常处理机制
     *
     * @param task         执行的任务
     * @param errorHandler 异常处理
     * @return future<Void>
     */
    public Future<?> submit(Runnable task, RunnableErrorHandler errorHandler) {
        return threadPoolExecutor.submit(() -> {
            try {
                task.run();
            } catch (Throwable e) {
                errorHandler.errorHandler(e);
            }
        });
    }
    
}

这里只提供了关于Runnable的封装,可以自行脑补Callable的方法。

  1. 演示
    1. 这里在构造线程池wrapper时,传入默认的异常处理机制,打印异常堆栈
    2. 第一个任务使用默认异常处理机制
    3. 第二个任务使用自定义异常处理机制
    @Test
    public void test1() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        // 包装原线程池,传入默认异常处理机制
        ExecutorServiceWrapper executorServiceWrapper = new ExecutorServiceWrapper(executorService, Throwable::printStackTrace);
        // 使用通用异常处理机制
        executorServiceWrapper.submit(() -> {
            int i = 1 / 0;
        });

        // 传入自定义异常处理机制
        executorServiceWrapper.submit(() -> {
            int i = 1 / 0;
        }, throwable -> {
            // 打印异常信息
            System.err.println("customer---" + throwable.getMessage());
        });
    }

输出:
可以看出,两个任务在发生错误时,分别按照规定走了特定的异常处理机制。

java.lang.ArithmeticException: / by zero
    at org.ywb.practise.difficulty.ExecutorMain.lambda$test1$2(ExecutorMain.java:43)
    at org.ywb.practise.difficulty.ExecutorServiceWrapper.lambda$submit$0(ExecutorServiceWrapper.java:30)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
customer---/ by zero

拓展阅读,其他框架对线程池异常处理的支持

如果业务相对简单,我任务使用上面的操作就可以了,但是如果业务要求比较多,希望得到更多的支持,可以使用下面的guava提供的工具。

Guava

  1. 引入依赖
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.0-jre</version>
        </dependency>
  1. 使用演示
    @Test
    public void test() {
        // 包装线程池
        ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());

        // 包装任务
        ListenableFutureTask<Void> listenableFutureTask = ListenableFutureTask.create(() -> {
            int i = 1 / 0;
        }, null);

        // 给任务添加回调
        Futures.addCallback(listenableFutureTask, new FutureCallback<Void>() {
            @Override
            public void onSuccess(@Nullable Void result) {
                // 成功后的回调
                System.out.println("success");
            }

            @Override
            public void onFailure(Throwable t) {
                // 异常处理
                t.printStackTrace();
            }
        }, guavaExecutor);

        // 提交任务
        guavaExecutor.submit(listenableFutureTask);
    }

guava 不仅对失败做了处理,还可以通过OnSuccess方法,对任务执行的结果添加后续操作。

netty

netty的处理方法简直爆赞,但是遗憾的是我们不能直接用,它实现的是一个自己框架使用的基于事件的一个线程池。他返回的Future结果是这样的,大家瞻仰一下,netty真的是一个非常赞的框架,希望大家有机会都学习学习。

public interface Future<V> extends java.util.concurrent.Future<V> {

    /**
     * Returns {@code true} if and only if the I/O operation was completed
     * successfully.
     */
    boolean isSuccess();

    /**
     * returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}.
     */
    boolean isCancellable();

    /**
     * Returns the cause of the failed I/O operation if the I/O operation has
     * failed.
     *
     * @return the cause of the failure.
     *         {@code null} if succeeded or this future is not
     *         completed yet.
     */
    Throwable cause();

    /**
     * Adds the specified listener to this future.  The
     * specified listener is notified when this future is
     * {@linkplain #isDone() done}.  If this future is already
     * completed, the specified listener is notified immediately.
     */
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    /**
     * Adds the specified listeners to this future.  The
     * specified listeners are notified when this future is
     * {@linkplain #isDone() done}.  If this future is already
     * completed, the specified listeners are notified immediately.
     */
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    /**
     * Removes the first occurrence of the specified listener from this future.
     * The specified listener is no longer notified when this
     * future is {@linkplain #isDone() done}.  If the specified
     * listener is not associated with this future, this method
     * does nothing and returns silently.
     */
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    /**
     * Removes the first occurrence for each of the listeners from this future.
     * The specified listeners are no longer notified when this
     * future is {@linkplain #isDone() done}.  If the specified
     * listeners are not associated with this future, this method
     * does nothing and returns silently.
     */
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future
     * failed.
     */
    Future<V> sync() throws InterruptedException;

    /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future
     * failed.
     */
    Future<V> syncUninterruptibly();

    /**
     * Waits for this future to be completed.
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    Future<V> await() throws InterruptedException;

    /**
     * Waits for this future to be completed without
     * interruption.  This method catches an {@link InterruptedException} and
     * discards it silently.
     */
    Future<V> awaitUninterruptibly();

    /**
     * Waits for this future to be completed within the
     * specified time limit.
     *
     * @return {@code true} if and only if the future was completed within
     *         the specified time limit
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * Waits for this future to be completed within the
     * specified time limit.
     *
     * @return {@code true} if and only if the future was completed within
     *         the specified time limit
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    boolean await(long timeoutMillis) throws InterruptedException;

    /**
     * Waits for this future to be completed within the
     * specified time limit without interruption.  This method catches an
     * {@link InterruptedException} and discards it silently.
     *
     * @return {@code true} if and only if the future was completed within
     *         the specified time limit
     */
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

    /**
     * Waits for this future to be completed within the
     * specified time limit without interruption.  This method catches an
     * {@link InterruptedException} and discards it silently.
     *
     * @return {@code true} if and only if the future was completed within
     *         the specified time limit
     */
    boolean awaitUninterruptibly(long timeoutMillis);

    /**
     * Return the result without blocking. If the future is not done yet this will return {@code null}.
     *
     * As it is possible that a {@code null} value is used to mark the future as successful you also need to check
     * if the future is really done with {@link #isDone()} and not rely on the returned {@code null} value.
     */
    V getNow();

    /**
     * {@inheritDoc}
     *
     * If the cancellation was successful it will fail the future with a {@link CancellationException}.
     */
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

相关文章

  • python3 线程池和异常处理

    引用 线程池的基本使用as_completedwaitmap 线程池的异常处理 进程池用法 引用 Python中已...

  • 线程池异常处理

    一个栗子 模拟线程池中任务在执行过程中发生异常。 这段代码在执行中,肯定会报一个除0异常,但是我们不会收到任何错误...

  • Java线程池异常处理

    起因 在Java默认的线程池中执行的程序,如果程序产生异常导致线程池里面的线程死掉,完全没有任何信息抛出来,这个是...

  • 线程池终探

    线程池四种拒绝策略 AbortPolicy 直接抛异常 DiscardPolicy 丢弃不处理 DiscardO...

  • 线程池使用实例

    自己定义了异常处理类,可以用于记录问题日志。 线程池流程图

  • Java线程池异常处理方案

    执行多线程并发任务的时候,如果任务类型相同,一般会考虑使用线程池,一方面利用了并发的优势,一方面避免创建大量线程得...

  • 一篇文章搞懂线程池

    线程池 什么使用使用线程池? 单个任务处理时间比较短 需要处理的任务数量很大 线程池优势 重用存在的线程,减少线程...

  • 08 线程池

    1 线程池概述 1.1 什么是线程池 线程池就是提前创建若干个线程,如果有任务需要处理,线程池里的线程就会处理任务...

  • java----线程池

    什么是线程池 为什么要使用线程池 线程池的处理逻辑 如何使用线程池 如何合理配置线程池的大小 结语 什么是线程池 ...

  • 线程池并发相关

    1.线程池大小设置 2.线程池执行过程中遇到异常会发生什么,怎样处理? 3.JUC 常用 4 大并发工具类 4.关...

网友评论

      本文标题:线程池异常处理

      本文链接:https://www.haomeiwen.com/subject/lpwaultx.html