美文网首页
探究一下线程的真正执行(JVM层面)、Callable、Futu

探究一下线程的真正执行(JVM层面)、Callable、Futu

作者: 阿粒_lxf | 来源:发表于2019-01-28 22:39 被阅读0次

    探究一下线程的真正执行(JVM层面)、Callable、Future以及线程池的执行过程和源码

    本文主要是作者用于笔记,写的比较简陋,如有错误之处,敬请包涵!

    package concurrent;
    
    import java.util.concurrent.*;
    
    /**
     * @author LiuXF
     * @date 2018/12/27 14:08:13
     */
    public class InterruptTest {
        private volatile boolean flag = true;
        private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
    
        private class Task implements Callable<String> {
            @Override
            public String call() throws Exception {
                Thread.sleep(5000);
                while (flag) {
    
                }
                return "2";
            }
        }
    
    
        private void start() {
            Future future = executor.submit(new Task());
            try {
    //            future.cancel(true);
    //            int a = 1 / 0;
                Thread t1 = new Thread(() -> {
                    try {
                        future.get(3000, TimeUnit.MICROSECONDS);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                });
                t1.setName("111111111");
                t1.start();
                Thread t2 = new Thread(() -> {
                    try {
                        future.get(3000, TimeUnit.MICROSECONDS);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                });
                t2.setName("22222222");
                t2.start();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
    //                boolean flag = future.cancel(true);
    //                System.out.println(flag);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            InterruptTest interruptTest = new InterruptTest();
            interruptTest.start();
    //        Thread t = new Thread(() -> {
    ////            LockSupport.park();
    //            try {
    //                Thread.currentThread().interrupt();
    //                Thread.sleep(1000);
    //            } catch (InterruptedException e) {
    //                e.printStackTrace();
    //            }
    ////            while (true) {
    ////                System.out.println("1");
    //////                Thread.currentThread().interrupt();
    //////                Thread.interrupted();
    //////                LockSupport.park();
    ////            }
    //
    ////            LockSupport.park();
    //        });
    //        t.start();
    //        t.interrupt();
    //        try {
    //
    //        } catch (InterruptedException e) {
    //            e.printStackTrace();
    //        }
    //        AtomicBoolean flag = new AtomicBoolean(false);
    //        while (true) {
    //            if (!flag.get() && t.getState().toString().equals(Thread.State.WAITING.toString())) {
    ////                t.interrupt();
    //                flag.set(true);
    //            }
    //            System.out.println(t.getState().toString());
    //        }
    
        }
    }
    
    

    线程池

    AbstractExecutorService.submit --->

        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    

    ThreadPoolExecutor.execute-->

            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
    

    ThreadPoolExecutor.addWorker--->

                w = new Worker(firstTask);
                final Thread t = w.thread;//从工厂(线程池)里面获取线程对象 详情看Worker的构造函数getThreadFactory().newThread(this);
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();//这里执行我们Worker的run方法
                        workerStarted = true;
                    }
                }
    

    Thread.start()--->

     try {
                start0();
                started = true;
            } 
    ///也就是新创建的线程启动调用native start0方法,而这些native方法的注册是在Thread对象初始化的时候完成的
        private static native void registerNatives();
        static {
            registerNatives(); //而本地方法 registerNatives 是定义在 Thread.c 文件中的 
        }
    

    Thread.c --->

    static JNINativeMethod methods[] = {
        {"start0",           "()V",        (void *)&JVM_StartThread},//在jvm.cpp里面
        {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
        {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
        {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
        {"resume0",          "()V",        (void *)&JVM_ResumeThread},
        {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
        {"yield",            "()V",        (void *)&JVM_Yield},
        {"sleep",            "(J)V",       (void *)&JVM_Sleep},
        {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
        {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
        {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
        {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
        {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
        {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
        {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
        {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
    };
    

    jvm.cpp--->

    JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
      JVMWrapper("JVM_StartThread");
      JavaThread *native_thread = NULL;
    
      // We cannot hold the Threads_lock when we throw an exception,
      // due to rank ordering issues. Example:  we might need to grab the
      // Heap_lock while we construct the exception.
      bool throw_illegal_thread_state = false;
    
      // We must release the Threads_lock before we can post a jvmti event
      // in Thread::start.
      {
        // Ensure that the C++ Thread and OSThread structures aren't freed before
        // we operate.
        MutexLocker mu(Threads_lock);
    
        // Since JDK 5 the java.lang.Thread threadStatus is used to prevent
        // re-starting an already started thread, so we should usually find
        // that the JavaThread is null. However for a JNI attached thread
        // there is a small window between the Thread object being created
        // (with its JavaThread set) and the update to its threadStatus, so we
        // have to check for this
        if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
          throw_illegal_thread_state = true;
        } else {
          // We could also check the stillborn flag to see if this thread was already stopped, but
          // for historical reasons we let the thread detect that itself when it starts running
    
          jlong size =
                 java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
          // Allocate the C++ Thread structure and create the native thread.  The
          // stack size retrieved from java is signed, but the constructor takes
          // size_t (an unsigned type), so avoid passing negative values which would
          // result in really large stacks.
          size_t sz = size > 0 ? (size_t) size : 0;
          native_thread = new JavaThread(&thread_entry, sz);//这里开启一个线程
            
    
          // At this point it may be possible that no osthread was created for the
          // JavaThread due to lack of memory. Check for this situation and throw
          // an exception if necessary. Eventually we may want to change this so
          // that we only grab the lock if the thread was created successfully -
          // then we can also do this check and throw the exception in the
          // JavaThread constructor.
          if (native_thread->osthread() != NULL) {
            // Note: the current thread is not being used within "prepare".
            native_thread->prepare(jthread);
              //注意从这里开始run方法交给另一个线程去处理的,我们的主代码可能已经往下执行了
          }
        }
      }
    //这里执行
    static void thread_entry(JavaThread* thread, TRAPS) {
        HandleMark hm(THREAD);
        Handle obj(THREAD, thread->threadObj());
        JavaValue result(T_VOID);
        JavaCalls::call_virtual(&result,obj,
        KlassHandle(THREAD,SystemDictionary::Thread_klass()),
        vmSymbolHandles::run_method_name(),    //LOOK! 看这里 调用 vmSymbols.hpp里面的宏定义
        vmSymbolHandles::void_method_signature(),THREAD);
     }
    

    vmSymbols.hpp --->

    class vmSymbolHandles: AllStatic {
       ...
        template(run_method_name,"run")  //LOOK!!! 这里决定了调用的方法名称是 “run”!
       ...
    }
    

    Worker.run--->

        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask; //这里是外部我们传进来的FutureTask对象
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
    
            // Lock methods
            //
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    ThreadPoolExecutor.runWorker

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();//终于守得云开见月明这里就是我们的futureTask的run方法了
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    记得我们之前传进来的线程是什么吧是FutureTak类型的吧

    futureTask.run --->

        public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);//这里有个设置返回值的操作,我们可以知道实际上是result = c.call();这个方法 就是我们callable的call方法。这里交给另外一个线程去处理不影响主线程
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    

    futureTask.set

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    

    futureTask.finishCompletion

        private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            //唤醒被park在等待队列里面的线程(这些park的线程在下面future.get里面做的)
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;        // to reduce footprint
        }
    

    我们在主代码调用

    future.get(3000, TimeUnit.MICROSECONDS);
    

    接着看看future.get怎么实现的

        public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
    //这里主要是查看线程的转态,没有到COMPLETING 就一直等待,除非有等待时长
    

    这里我们不妨再来看看awaitDone这个方法,它到此阻塞的是什么

    其实阻塞的当前的线程,单线程依次往下判断4个条件,并且在几次循环之后将线程park起来,多线程private volatile WaitNode waiters;会放到这个链表里面

        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    
    1548685045886.png

    线程111111111的等待队列的waiters的样子

    1548685104701.png

    线程22222222的等待队列的waiters的样子

    相关文章

      网友评论

          本文标题:探究一下线程的真正执行(JVM层面)、Callable、Futu

          本文链接:https://www.haomeiwen.com/subject/uenzjqtx.html