美文网首页Java 杂谈
Java 线程池 之 被“吃掉”的线程异常(附源码分析和解决方法

Java 线程池 之 被“吃掉”的线程异常(附源码分析和解决方法

作者: Web前端学习营 | 来源:发表于2019-04-25 16:16 被阅读0次

今天遇到了一个bug,现象是,一个任务放入线程池中,似乎“没有被执行”,日志也没有打。

经过本地代码调试之后,发现在任务逻辑的前半段,抛出了 NPE ,但是代码外层没有 try-catch ,导致这个异常被吃掉。

这个问题解决起来是很简单的,外层加个 try-catch 就好了,但是这个异常如果没有被catch,线程池内部逻辑是怎么处理这个异常的呢?这个异常最后会跑到哪里呢?

哦对了,在分享这篇文字前,我先说一下,我这里有一份Java学习资料,直接加我的Java直播学习群:1004944760就能免费领取,长期真实有效。

带着疑问和好奇心,我研究了一下线程池那一块的源码,并且做了以下的总结。

源码分析

项目中出问题的代码差不多就是下面这个样子

ExecutorService threadPool = Executors.newFixedThreadPool(3);threadPool.submit(() -> {    String pennyStr =null;    Double penny = Double.valueOf(pennyStr);    ...})

先进到 newFixedThreadPool 这个工厂方法中看生成的具体实现类,发现是 ThreadPoolExecutor

publicstaticExecutorServicenewFixedThreadPool(intnThreads){returnnewThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,newLinkedBlockingQueue());    }

再看这个类的继承关系,

再进到 submit 方法,这个方法在 ExecutorService 接口中约定,其实是在 AbstractExectorService 中实现, ThreadPoolExecutor 并没有override这个方法。

publicFuture submit(Runnabletask) {if(task==null)thrownewNullPointerException();        RunnableFuture ftask = newTaskFor(task,null);        execute(ftask);returnftask;    }protected RunnableFuture newTaskFor(Runnable runnable, T value) {returnnewFutureTask(runnable, value);    }

对应的 FutureTask对象的 构造方法

publicFutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;// state由volatile 修饰 保证多线程下的可见性}

对应 Callable 对象的构造方法

publicstatic Callable callable(Runnabletask, T result) {if(task==null)thrownewNullPointerException();returnnewRunnableAdapter(task, result);    }

对应 RunnableAdapter 对象的构造方法

/**

    * A callable that runs given task and returns given result

    * 一个能执行所给任务并且返回结果的Callable对象

    */staticfinalclassRunnableAdapterimplementsCallable {finalRunnabletask;finalT result;        RunnableAdapter(Runnabletask, T result) {this.task=task;this.result = result;        }publicTcall() {task.run();returnresult;        }    }

总结上面的, newTaskFor 就是把我们提交的 Runnable 对象包装成了一个 Future 。

接下来就是会把任务提交到队列中给线程池调度处理:

publicvoidexecute(Runnable command){if(command ==null)thrownewNullPointerException();intc = ctl.get();if(workerCountOf(c) < corePoolSize) {if(addWorker(command,true))return;            c = ctl.get();        }if(isRunning(c) && workQueue.offer(command)) {intrecheck = ctl.get();if(! isRunning(recheck) &&remove(command))                reject(command);elseif(workerCountOf(recheck)==0)                addWorker(null,false);        }elseif(!addWorker(command,false))reject(command);    }

因为主要关心的是这个线程怎么执行,异常的抛出和处理,所以我们暂时不解析多余的逻辑。很容易发现,如果任务要被执行,肯定是进到了 addWorker 方法当中,所以我们再进去看,鉴于 addWorker 方法的很长,不想列太多的代码,我就摘了关键代码段:

privatebooleanaddWorker(Runnable firstTask,booleancore){  ...booleanworkerStarted =false;booleanworkerAdded =false;  Worker w =null;try{// 实例化一个worker对象w =newWorker(firstTask);finalThread t = w.thread;if(t !=null) {finalReentrantLock mainLock =this.mainLock;          mainLock.lock();try{intrs = runStateOf(ctl.get());if(rs < SHUTDOWN ||                  (rs == SHUTDOWN && firstTask ==null)) {if(t.isAlive())// precheck that t is startablethrownewIllegalThreadStateException();                  workers.add(w);ints = workers.size();if(s > largestPoolSize)                      largestPoolSize = s;                  workerAdded =true;              }          }finally{              mainLock.unlock();          }if(workerAdded) {// 从Worker对象的构造方法看,当这个thread对象start之后,// 之后实际上就是调用Worker对象的run()t.start();              workerStarted =true;          }      }  }finally{if(! workerStarted)          addWorkerFailed(w);  }returnworkerStarted;}// Worker的构造方法Worker(Runnable firstTask) {            setState(-1);// inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);        }

我们再看这个 ThreadPoolExecutor 的内部类 Worker 对象:

privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{        .../** Delegates main run loop to outer runWorker  */publicvoidrun(){            runWorker(this);        }      ...  }

看来真正执行任务的是在这个外部的 runWorker 当中,让我们再看看这个方法是怎么消费 Worker 线程的。

finalvoidrunWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnabletask= w.firstTask;    w.firstTask =null;    w.unlock();// allow interruptsbooleancompletedAbruptly =true;try{while(task!=null|| (task= getTask()) !=null) {            w.lock();if((runStateAtLeast(ctl.get(), STOP) ||                (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                wt.interrupt();try{                beforeExecute(wt,task);                Throwable thrown =null;// ==== 关键代码 start ====try{// 很简洁明了,调用了任务的run方法task.run();                }catch(RuntimeException x) {                    thrown = x;throwx;                }catch(Error x) {                    thrown = x;throwx;                }catch(Throwable x) {                    thrown = x;thrownewError(x);                }finally{                    afterExecute(task, thrown);                }// ==== 关键代码 end ====}finally{task=null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly =false;    }finally{        processWorkerExit(w, completedAbruptly);    }}

终于走到底了,可以看到关键代码中的try-catch block代码块中,调用了本次执行任务的 run方法。

// ==== 关键代码 start ====try{// 很简洁明了,调用了任务的run方法task.run();}catch(RuntimeException x) {  thrown = x;throwx;}catch(Errorx) {  thrown = x;throwx;}catch(Throwable x) {  thrown = x;thrownewError(x);}finally{  afterExecute(task, thrown);}// ==== 关键代码 end ====

可以看到捕捉了异常之后,会再向外抛出,只不过再finally block 中有个 afterExecute() 方法,似乎在这里是可以处理这个异常信息的,进去看看

protectedvoidafterExecute(Runnable r, Throwable t){ }

可以看到 ThreadPoolExecutor#afterExecute() 方法中,是什么都没做的,看来是让使用者通过override这个方法来定制化任务执行之后的逻辑,其中可以包括异常处理。

那么这个异常到底是抛到哪里去了呢。我在一个大佬的文章找到了hotSpot JVM处理线程异常的逻辑,

if(!destroy_vm || JDK_Version::is_jdk12x_version()) {// JSR-166: change call from from ThreadGroup.uncaughtException to// java.lang.Thread.dispatchUncaughtExceptionif(uncaught_exception.not_null()) {//如果有未捕获的异常Handle group(this, java_lang_Thread::threadGroup(threadObj()));      {        KlassHandle recvrKlass(THREAD, threadObj->klass());        CallInfo callinfo;        KlassHandle thread_klass(THREAD, SystemDictionary::Thread_klass());/* 

        这里类似一个方法表,实际就会去调用Thread#dispatchUncaughtException方法

        template(dispatchUncaughtException_name,            "dispatchUncaughtException")               

        */LinkResolver::resolve_virtual_call(callinfo, threadObj, recvrKlass, thread_klass,                                          vmSymbols::dispatchUncaughtException_name(),                                          vmSymbols::throwable_void_signature(),                                          KlassHandle(),false,false, THREAD);        CLEAR_PENDING_EXCEPTION;        methodHandle method = callinfo.selected_method();if(method.not_null()) {          JavaValue result(T_VOID);          JavaCalls::call_virtual(&result,                                  threadObj, thread_klass,                                  vmSymbols::dispatchUncaughtException_name(),                                  vmSymbols::throwable_void_signature(),                                  uncaught_exception,                                  THREAD);        }else{          KlassHandle thread_group(THREAD, SystemDictionary::ThreadGroup_klass());          JavaValue result(T_VOID);          JavaCalls::call_virtual(&result,                                  group, thread_group,                                  vmSymbols::uncaughtException_name(),                                  vmSymbols::thread_throwable_void_signature(),                                  threadObj,// Arg 1uncaught_exception,// Arg 2THREAD);        }if(HAS_PENDING_EXCEPTION) {          ResourceMark rm(this);          jio_fprintf(defaultStream::error_stream(),"\nException: %s thrown from the UncaughtExceptionHandler"" in thread \"%s\"\n",                pending_exception()->klass()->external_name(),                get_thread_name());          CLEAR_PENDING_EXCEPTION;        }      }    }

代码是C写的,有兴趣可以去全文,根据英文注释能稍微看懂一点

http://hg.openjdk.java.net/jd...

可以看到这里最终会去调用 Thread#dispatchUncaughtException 方法:

/**

    * Dispatch an uncaught exception to the handler. This method is

    * intended to be called only by the JVM.

    */privatevoiddispatchUncaughtException(Throwable e){        getUncaughtExceptionHandler().uncaughtException(this, e);    }

/** * Called by the Java Virtual Machine when a thread in this * thread group stops because of an uncaught exception, and the thread * does not have a specific {@linkThread.UncaughtExceptionHandler} * installed. * */publicvoid uncaughtException(Thread t, Throwable e) {if(parent!=null) {parent.uncaughtException(t, e);        }else{            Thread.UncaughtExceptionHandler ueh =                Thread.getDefaultUncaughtExceptionHandler();if(ueh !=null) {                ueh.uncaughtException(t, e);            }elseif(!(einstanceofThreadDeath)) {//可以看到会打到System.err里面System.err.print("Exception in thread \""+ t.getName() +"\" ");                e.printStackTrace(System.err);            }        }    }

jdk的注释也说明的很清楚了,当一个线程抛出了一个未捕获的异常,JVM会去调用这个方法。如果当前线程没有声明 UncaughtExceptionHandler 成员变量并且重写 uncaughtException 方法的时候,就会看线程所属的线程组(如果有线程组的话)有没有这个类,没有就会打到 System.err 里面。

IBM这篇文章也提倡我们使用 ThreadGroup 提供的 uncaughtException 处理程序来在线程异常终止时进行检测。

https://www.ibm.com/developer...

总结 (解决方法)

从上述源码分析中可以看到,对于本篇的异常“被吃掉”的问题,有以下几种方法

用try-catch 捕捉,一般都是用这种

线程或者线程组对象设置UncaughtExceptionHandler成员变量

Thread t =newThread(r);            t.setUncaughtExceptionHandler((t1, e)->LOGGER.error(t1 +" throws exception: "+ e));returnt;

override 线程池的 afterExecute 方法。

本篇虽然是提出问题的解决方法,但主旨还是分析源码,了解了整个过程中异常的经过的流程,希望能对您产生帮助。

相关文章

网友评论

    本文标题:Java 线程池 之 被“吃掉”的线程异常(附源码分析和解决方法

    本文链接:https://www.haomeiwen.com/subject/payzgqtx.html