美文网首页
资深程序员分享java线程池处理机制(从异常信息处理说起 )

资深程序员分享java线程池处理机制(从异常信息处理说起 )

作者: java高级架构F六 | 来源:发表于2019-11-21 23:00 被阅读0次

    前言

    今天小伙伴遇到个小问题,线程池提交的任务如果没有抓住异常,那么会抛到哪里去,之前倒是没研究过,本着实事求是的原则,看了一下代码。

    正文

    小问题

    考虑下面这段代码,有什么区别呢?你可以猜猜会不会有异常打出呢?如果打出来的话是在哪里?:

    ExecutorService threadPool = Executors.newFixedThreadPool(1);

    threadPool.submit(() -> {

    Object obj = null;

    System.out.println(obj.toString());

    });

    threadPool.execute(() -> {

    Object obj = null;

    System.out.println(obj.toString());

    });

    源码解析

    我们下面就来看下代码,其实就是将我们提交过去的可运行的包装成一个未来

    public Future submit(Runnable task) {

    if (task == null) throw new NullPointerException();

    RunnableFuture ftask = newTaskFor(task, null);

    execute(ftask);

    return ftask;

    }

    protected RunnableFuture newTaskFor(Runnable runnable, T value) {

    return new FutureTask(runnable, value);

    }

    public FutureTask(Runnable runnable, V result) {

    this.callable = Executors.callable(runnable, result);

    this.state = NEW; // volatile修饰,保证多线程下的可见性,可以看看Java内存模型

    }

    public static Callable callable(Runnable task, T result) {

    if (task == null)

    throw new NullPointerException();

    return new RunnableAdapter(task, result);

    }

    static final class RunnableAdapter implements Callable {

    final Runnable task;

    final T result;

    RunnableAdapter(Runnable task, T result) {

    this.task = task;

    this.result = result;

    }

    public T call() {

    task.run();

    return result;

    }

    }

    接下来就会实际提交到队列中交给线程池调度处理:

    /**

    * 代码还是很清爽的,一个很典型的生产者/消费者模型,

    * 这里暂不纠结这些细节,那么如果提交到workQueue成功的话,消费者是谁呢?

    * 明显在这个newWorker里搞的鬼,同样细节有兴趣可以自己再去研究,这里我们会发现

    * 核心就是Worker这个内部类

    */

    public void execute(Runnable command) {

    if (command == null)

    throw new NullPointerException();

    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) {

    if (addWorker(command, true))

    return;

    c = ctl.get();

    }

    if (isRunning(c) && workQueue.offer(command)) {

    int recheck = ctl.get();

    if (! isRunning(recheck) && remove(command))

    reject(command);

    else if (workerCountOf(recheck) == 0)

    addWorker(null, false);

    }

    else if (!addWorker(command, false))

    reject(command);

    }

    那么接下来看看线程池核心的流程:

    private final class Worker

    extends AbstractQueuedSynchronizer

    implements Runnable{

    /** Delegates main run loop to outer runWorker */

    public void run() {

    runWorker(this);

    }

    }

    final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();

    Runnable task = w.firstTask;

    w.firstTask = null;

    w.unlock(); // allow interrupts

    boolean completedAbruptly = true;

    try {

    //getTask()方法会尝试从队列中抓取数据

    while (task != null || (task = getTask()) != null) {

    w.lock();

    if ((runStateAtLeast(ctl.get(), STOP) ||

    (Thread.interrupted() &&

    runStateAtLeast(ctl.get(), STOP))) &&

    !wt.isInterrupted())

    wt.interrupt();

    try {

    //可覆写此方法打日志埋点之类的

    beforeExecute(wt, task);

    Throwable thrown = null;

    try {

    //简单明了,直接调用run方法

    task.run();

    } catch (RuntimeException x) {

    thrown = x; throw x;

    } catch (Error x) {

    thrown = x; throw x;

    } catch (Throwable x) {

    thrown = x; throw new Error(x);

    } finally {

    afterExecute(task, thrown);

    }

    } finally {

    task = null;

    w.completedTasks++;

    w.unlock();

    }

    }

    completedAbruptly = false;

    } finally {

    processWorkerExit(w, completedAbruptly);

    }

    }

    提交的方式

    那么我们可以这里是直接调用的Run方法,先看Submit的方式,我们知道最终传递过去的是一个FutureTask,也就是说会调用这里的Run方法,我们看看实现:

    public void run() {

    if (state != NEW ||

    !UNSAFE.compareAndSwapObject(this, runnerOffset,

    null, Thread.currentThread()))

    return;

    try {

    Callable c = callable;

    if (c != null && state == NEW) {

    V result;

    boolean ran;

    try {

    result = c.call();

    ran = true;

    } catch (Throwable ex) {

    result = null;

    ran = false;

    //。。。

    setException(ex);

    }

    if (ran)

    set(result);

    }

    } finally {

    //省略

    }

    protected void setException(Throwable t) {

    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

    outcome = t; //赋给了这个变量

    UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state

    finishCompletion();

    }

    }

    可以看到其实类似于直接吞掉了,这样的话我们调用GET()方法的时候会拿到,比如我们可以重写After Execute方法,从而可以得到实际的异常:

    protected void afterExecute(Runnable r, Throwable t) {

    super.afterExecute(r, t);

    if (t == null && r instanceof Future) {

    try {

    //get这里会首先检查任务的状态,然后将上面的异常包装成ExecutionException

    Object result = ((Future) r).get();

    } catch (CancellationException ce) {

    t = ce;

    } catch (ExecutionException ee) {

    t = ee.getCause();

    } catch (InterruptedException ie) {

    Thread.currentThread().interrupt(); // ignore/reset

    }

    }

    if (t != null){

    //异常处理

    t.printStackTrace();

    }

    }

    执行的方式

    那么如果是直接Exeture的方式有啥不同呢?这样的话传递过去的就直接是Runnable,因此就会直接抛出:

    try {

    task.run();

    } catch (RuntimeException x) {

    thrown = x; throw x;

    } catch (Error x) {

    thrown = x; throw x;

    } catch (Throwable x) {

    thrown = x; throw new Error(x);

    } finally {

    afterExecute(task, thrown);

    }

    那么这里的异常到底会抛出到哪里呢,我们看看JVM具体是怎么处理的:

    if (!destroy_vm || JDK_Version::is_jdk12x_version()) {

    // JSR-166: change call from from ThreadGroup.uncaughtException to

    // java.lang.Thread.dispatchUncaughtException

    if (uncaught_exception.not_null()) {

    //如果有未捕获的异常

    Handle group(this, java_lang_Thread::threadGroup(threadObj()));

    {

    KlassHandle recvrKlass(THREAD, threadObj->klass());

    CallInfo callinfo;

    KlassHandle thread_klass(THREAD, SystemDictionary::Thread_klass());

    /*

    这里类似一个方法表,实际就会去调用Thread#dispatchUncaughtException方法

    template(dispatchUncaughtException_name, "dispatchUncaughtException")

    */

    LinkResolver::resolve_virtual_call(callinfo, threadObj, recvrKlass, thread_klass,

    vmSymbols::dispatchUncaughtException_name(),

    vmSymbols::throwable_void_signature(),

    KlassHandle(), false, false, THREAD);

    CLEAR_PENDING_EXCEPTION;

    methodHandle method = callinfo.selected_method();

    if (method.not_null()) {

    JavaValue result(T_VOID);

    JavaCalls::call_virtual(&result,

    threadObj, thread_klass,

    vmSymbols::dispatchUncaughtException_name(),

    vmSymbols::throwable_void_signature(),

    uncaught_exception,

    THREAD);

    } else {

    KlassHandle thread_group(THREAD, SystemDictionary::ThreadGroup_klass());

    JavaValue result(T_VOID);

    JavaCalls::call_virtual(&result,

    group, thread_group,

    vmSymbols::uncaughtException_name(),

    vmSymbols::thread_throwable_void_signature(),

    threadObj, // Arg 1

    uncaught_exception, // Arg 2

    THREAD);

    }

    if (HAS_PENDING_EXCEPTION) {

    ResourceMark rm(this);

    jio_fprintf(defaultStream::error_stream(),

    "\nException: %s thrown from the UncaughtExceptionHandler"

    " in thread \"%s\"\n",

    pending_exception()->klass()->external_name(),

    get_thread_name());

    CLEAR_PENDING_EXCEPTION;

    }

    }

    }

    可以看到这里最终会去调用线程#DispatchUncoghtException方法:

    private void dispatchUncaughtException(Throwable e) {

    //默认会调用ThreadGroup的实现

    getUncaughtExceptionHandler().uncaughtException(this, e);

    }

    public void uncaughtException(Thread t, Throwable e) {

    if (parent != null) {

    parent.uncaughtException(t, e);

    } else {

    Thread.UncaughtExceptionHandler ueh =

    Thread.getDefaultUncaughtExceptionHandler();

    if (ueh != null) {

    ueh.uncaughtException(t, e);

    } else if (!(e instanceof ThreadDeath)) {

    //可以看到会打到System.err里面

    System.err.print("Exception in thread \""

    + t.getName() + "\" ");

    e.printStackTrace(System.err);

    }

    }

    }

    这里如果环境是tomcat的话最终会打到catalina.out:

    总结

    对于线程池、包括线程的异常处理推荐一下方式:

    1直接TRY/CATCH,个人基本都是用这种方式

    2线程直接重写整个方法:

    Thread t = new Thread();

    t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

    public void uncaughtException(Thread t, Throwable e) {

    LOGGER.error(t + " throws exception: " + e);

    }

    });

    //如果是线程池的模式:

    ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {

    Thread t = new Thread(r);

    t.setUncaughtExceptionHandler(

    (t1, e) -> LOGGER.error(t1 + " throws exception: " + e));

    return t;

    });

    3也可以直接重写protected void afterExecute(Runnable r, Throwable t) { }方法

    相关文章

      网友评论

          本文标题:资深程序员分享java线程池处理机制(从异常信息处理说起 )

          本文链接:https://www.haomeiwen.com/subject/bvbxwctx.html