美文网首页Java开发那些事
深度探险线程池复用原理

深度探险线程池复用原理

作者: 他是人间惆怅客 | 来源:发表于2020-07-26 22:47 被阅读0次

    一. java如何实现run()方法回调:

    通常我们启动一个线程有两种方式,但是不管你通过start一个Thread还是通过实现Runnable的方式创建线程,都要实现它的run方法,run方法只是一个回调方法,那jvm什么时候去调用这个方法呢?

    在java层面创建一个线程,本质上就是创建了一个native线程,本地线程对应到本地代码,java提供了一个线程统一函数, 由jvm调用java线程方法.启动一个线程实际上调用了start0()方法,而start0()实际调用了一个native方法.

    public synchronized void start() {
            /**
             * This method is not invoked for the main method thread or "system"
             * group threads created/set up by the VM. Any new functionality added
             * to this method in the future may have to also be added to the VM.
             *
             * A zero status value corresponds to state "NEW".
             */
            if (threadStatus != 0)
                throw new IllegalThreadStateException();
    
            /* Notify the group that this thread is about to be started
             * so that it can be added to the group's list of threads
             * and the group's unstarted count can be decremented. */
            group.add(this);
    
            boolean started = false;
            try {
                start0();
                started = true;
            } finally {
                try {
                    if (!started) {
                        group.threadStartFailed(this);
                    }
                } catch (Throwable ignore) {
                    /* do nothing. If start0 threw a Throwable then
                      it will be passed up the call stack */
                }
            }
        }
        
         private native void start0();
    
    • start0()其实是由Thread这个类里面的静态的代码块registerNatives()注册进去的,几乎所有的native thread都是由它注册进去的.registerNatives在class load到jvm时候就被注册到本地方法.native修饰的是底层实现的方法,源码一般是由c或者c++实现.
    public
    class Thread implements Runnable {
        /* Make sure registerNatives is the first thing <clinit> does. */
        private static native void registerNatives();
        static {
            registerNatives();
        }
    
    }
    
    • 本地方法registerNatives()实际是定义在Thread.c这个文件中
    JNIEXPORT void JNICALL 
    Java_Java_lang_Thread_registerNatives (JNIEnv *env, jclass cls){ 
     (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods)); 
    } 
    static JNINativeMethod methods[] = { 
     {"start0", "()V",(void *)&JVM_StartThread}, 
     {"stop0", "(" OBJ ")V", (void *)&JVM_StopThread}, 
     {"isAlive","()Z",(void *)&JVM_IsThreadAlive}, 
     {"suspend0","()V",(void *)&JVM_SuspendThread}, 
     {"resume0","()V",(void *)&JVM_ResumeThread}, 
     {"setPriority0","(I)V",(void *)&JVM_SetThreadPriority}, 
     {"yield", "()V",(void *)&JVM_Yield}, 
     {"sleep","(J)V",(void *)&JVM_Sleep}, 
     {"currentThread","()" THD,(void *)&JVM_CurrentThread}, 
     {"countStackFrames","()I",(void *)&JVM_CountStackFrames}, 
     {"interrupt0","()V",(void *)&JVM_Interrupt}, 
     {"isInterrupted","(Z)Z",(void *)&JVM_IsInterrupted}, 
     {"holdsLock","(" OBJ ")Z",(void *)&JVM_HoldsLock}, 
     {"getThreads","()[" THD,(void *)&JVM_GetAllThreads}, 
     {"dumpThreads","([" THD ")[[" STE, (void *)&JVM_DumpThreads}, 
    };
    

    JNINativeMethod定义了一个静态数组,调用start0,需要调用JVM_StartThread方法,而JVM_StartThread方法是由jvm里面thread_entry调用,具体可以阅读jvm源码实现,这一部分还未深入研究.

    二. 线程池复用原理:

    2.1 AbstractExecutorService

    • 本文从submit一个Runnable说起,我们可以看到提交一个任务实际上它内部还是调用了execute方法,只是帮我们用Future封装了一下线程执行结果.
    /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
    

    2.2 execute

    • 如果运行的线程数小于核心线程池配置大小,开启一个线程
    • 如果核心线程池都处于运行状态,任务丢到队列中去排队.实际上在这里,作者做了严谨的判断,采用双重检查方式,因为作者认为在上一次check后可能这个任务被kill掉或者被线程池shut down了,所以重新检查一下运行状态,必要时会滚排队,如果没有新线程stopped就返回新线程
    • 如果队列已满拒绝
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    

    2.3 addWorker

    只要new Worker(Runnable firstTask),就会new一个线程,并且new一个thread的时候将这个内部类本身this传入进去当task,然后调用了start方法去开启这个线程

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    2.4 Worker

    线程启动的时候我们将内部类worker对象传入进去了,内部类Worker是实现了runable接口的,重写了run()方法,线程获得cpu时间片以后, java虚拟机就会执行Worker中的run方法,run方法里面调用runWorker(this),这里的this就是指的就是当前的Worker对象

    private final class Worker
         extends AbstractQueuedSynchronizer
         implements Runnable
     {
         /**
          * This class will never be serialized, but we provide a
          * serialVersionUID to suppress a javac warning.
          */
         private static final long serialVersionUID = 6138294804551838833L;
    
         /** Thread this worker is running in.  Null if factory fails. */
         final Thread thread;
         /** Initial task to run.  Possibly null. */
         Runnable firstTask;
         /** Per-thread task counter */
         volatile long completedTasks;
    
         /**
          * Creates with given first task and thread from ThreadFactory.
          * @param firstTask the first task (null if none)
          */
         Worker(Runnable firstTask) {
             setState(-1); // inhibit interrupts until runWorker
             this.firstTask = firstTask;
             this.thread = getThreadFactory().newThread(this);
         }
    
         /** Delegates main run loop to outer runWorker  */
         public void run() {
             runWorker(this);
         }
    
         // Lock methods
         //
         // The value 0 represents the unlocked state.
         // The value 1 represents the locked state.
    
         protected boolean isHeldExclusively() {
             return getState() != 0;
         }
    
         protected boolean tryAcquire(int unused) {
             if (compareAndSetState(0, 1)) {
                 setExclusiveOwnerThread(Thread.currentThread());
                 return true;
             }
             return false;
         }
    
         protected boolean tryRelease(int unused) {
             setExclusiveOwnerThread(null);
             setState(0);
             return true;
         }
    
         public void lock()        { acquire(1); }
         public boolean tryLock()  { return tryAcquire(1); }
         public void unlock()      { release(1); }
         public boolean isLocked() { return isHeldExclusively(); }
    
         void interruptIfStarted() {
             Thread t;
             if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                 try {
                     t.interrupt();
                 } catch (SecurityException ignore) {
                 }
             }
         }
     }
    
    

    2.5 runWorker

    这里是一个while循环,当task不为空或者 getTask()不为空,核心逻辑在getTask()里面

    final void runWorker(Worker w) {
         Thread wt = Thread.currentThread();
         Runnable task = w.firstTask;
         w.firstTask = null;
         w.unlock(); // allow interrupts
         boolean completedAbruptly = true;
         try {
             while (task != null || (task = getTask()) != null) {
                 w.lock();
                 // If pool is stopping, ensure thread is interrupted;
                 // if not, ensure thread is not interrupted.  This
                 // requires a recheck in second case to deal with
                 // shutdownNow race while clearing interrupt
                 if ((runStateAtLeast(ctl.get(), STOP) ||
                      (Thread.interrupted() &&
                       runStateAtLeast(ctl.get(), STOP))) &&
                     !wt.isInterrupted())
                     wt.interrupt();
                 try {
                     beforeExecute(wt, task);
                     Throwable thrown = null;
                     try {
                         task.run();
                     } catch (RuntimeException x) {
                         thrown = x; throw x;
                     } catch (Error x) {
                         thrown = x; throw x;
                     } catch (Throwable x) {
                         thrown = x; throw new Error(x);
                     } finally {
                         afterExecute(task, thrown);
                     }
                 } finally {
                     task = null;
                     w.completedTasks++;
                     w.unlock();
                 }
             }
             completedAbruptly = false;
         } finally {
             processWorkerExit(w, completedAbruptly);
         }
     }
    

    2.6 getTask

    从队列中获取task,如果核心线程允许超时(默认false,不超时)或者工作线程数大于核心线程池,采取workQueue.poll方式从任务队列中poll数据,最大waitkeepAliveTime时间,没有返回null;否则采取take方式获取任务,take是阻塞式获取任务,这个就是核心线程池默认情况下,就是空闲也一直阻塞的原因,直到队列里有任务唤醒阻塞线程.

    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    
    

    相关文章

      网友评论

        本文标题:深度探险线程池复用原理

        本文链接:https://www.haomeiwen.com/subject/qtfnlktx.html