美文网首页Java
多线程基础(七):关于HotSpot中notify方法不具备随机

多线程基础(七):关于HotSpot中notify方法不具备随机

作者: 冬天里的懒喵 | 来源:发表于2020-09-09 14:48 被阅读0次

    [toc]

    在前面关于wait/notify及notifyAll方法的时候,notify在源码的注释中说到notify选择唤醒的线程是任意的,但是依赖于具体实现的jvm。原文如下:

     * Wakes up a single thread that is waiting on this object's
     * monitor. If any threads are waiting on this object, one of them
     * is chosen to be awakened. The choice is arbitrary and occurs at
     * the discretion of the implementation. A thread waits on an object's
     * monitor by calling one of the {@code wait} methods.
    

    但是在很多博客或者面试中聊到notify和notifyAll的时候,很多人都说notify是随机的。那么真的是随机吗 ?我们现在来对这个情况进行实验验证。

    1.实验一

    定义50个线程,分别让其依次wait,之后再进行notify。为了对照明显,特别将wait之后加了sleep,这样线程能按id依次wait。wait之前不存在锁竞争。

    package com.dhb.notify;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    public class NotifyTest{
    
        private static List<String> before = new LinkedList<>();
        private static List<String> after = new LinkedList<>();
    
        private static Object lock = new Object();
    
    
        public static void main(String[] args) throws InterruptedException{
    
            for(int i=0;i<50;i++){
                String threadName = Integer.toString(i);
                new Thread(() -> {
                    synchronized (lock) {
                        String cthreadName = Thread.currentThread().getName();
                        System.out.println("Thread ["+cthreadName+"] wait.");
                        before.add(cthreadName);
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("Thread ["+cthreadName+"] weak up.");
                        after.add(cthreadName);
                    }
                },threadName).start();
                TimeUnit.MILLISECONDS.sleep(50);
            }
    
            TimeUnit.SECONDS.sleep(1);
    
            for(int i=0;i<50;i++){
                synchronized (lock) {
                    lock.notify();
                    TimeUnit.MILLISECONDS.sleep(10);
                }
            }
            TimeUnit.SECONDS.sleep(1);
    
            System.out.println(before.toString());
            System.out.println(after.toString());
        }
    }
    

    上面代码的执行结果:

    Thread [0] wait.
    Thread [1] wait.
    Thread [2] wait.
    Thread [3] wait.
    Thread [4] wait.
    Thread [5] wait.
    Thread [6] wait.
    Thread [7] wait.
    Thread [8] wait.
    Thread [9] wait.
    Thread [10] wait.
    Thread [11] wait.
    Thread [12] wait.
    Thread [13] wait.
    Thread [14] wait.
    Thread [15] wait.
    Thread [16] wait.
    Thread [17] wait.
    Thread [18] wait.
    Thread [19] wait.
    Thread [20] wait.
    Thread [21] wait.
    Thread [22] wait.
    Thread [23] wait.
    Thread [24] wait.
    Thread [25] wait.
    Thread [26] wait.
    Thread [27] wait.
    Thread [28] wait.
    Thread [29] wait.
    Thread [30] wait.
    Thread [31] wait.
    Thread [32] wait.
    Thread [33] wait.
    Thread [34] wait.
    Thread [35] wait.
    Thread [36] wait.
    Thread [37] wait.
    Thread [38] wait.
    Thread [39] wait.
    Thread [40] wait.
    Thread [41] wait.
    Thread [42] wait.
    Thread [43] wait.
    Thread [44] wait.
    Thread [45] wait.
    Thread [46] wait.
    Thread [47] wait.
    Thread [48] wait.
    Thread [49] wait.
    Thread [0] weak up.
    Thread [19] weak up.
    Thread [18] weak up.
    Thread [17] weak up.
    Thread [16] weak up.
    Thread [15] weak up.
    Thread [14] weak up.
    Thread [13] weak up.
    Thread [12] weak up.
    Thread [11] weak up.
    Thread [10] weak up.
    Thread [9] weak up.
    Thread [8] weak up.
    Thread [7] weak up.
    Thread [6] weak up.
    Thread [5] weak up.
    Thread [4] weak up.
    Thread [3] weak up.
    Thread [2] weak up.
    Thread [1] weak up.
    Thread [44] weak up.
    Thread [43] weak up.
    Thread [42] weak up.
    Thread [41] weak up.
    Thread [40] weak up.
    Thread [39] weak up.
    Thread [38] weak up.
    Thread [37] weak up.
    Thread [36] weak up.
    Thread [35] weak up.
    Thread [34] weak up.
    Thread [33] weak up.
    Thread [32] weak up.
    Thread [31] weak up.
    Thread [30] weak up.
    Thread [29] weak up.
    Thread [28] weak up.
    Thread [27] weak up.
    Thread [26] weak up.
    Thread [25] weak up.
    Thread [24] weak up.
    Thread [23] weak up.
    Thread [22] weak up.
    Thread [21] weak up.
    Thread [20] weak up.
    Thread [47] weak up.
    Thread [46] weak up.
    Thread [45] weak up.
    Thread [49] weak up.
    Thread [48] weak up.
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
    [0, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 47, 46, 45, 49, 48]
    

    根据上面的执行结果,貌似weakup的顺序是随机的。那么要是我们这么快就得到了结果,哪岂不是本文的标题有问题。我们再来看看第二个实验。

    2.实验二

    package com.dhb.notify;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    public class NotifyTest{
    
        private static List<String> before = new LinkedList<>();
        private static List<String> after = new LinkedList<>();
    
        private static Object lock = new Object();
    
    
        public static void main(String[] args) throws InterruptedException{
    
            for(int i=0;i<50;i++){
                String threadName = Integer.toString(i);
                new Thread(() -> {
                    synchronized (lock) {
                        String cthreadName = Thread.currentThread().getName();
                        System.out.println("Thread ["+cthreadName+"] wait.");
                        before.add(cthreadName);
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("Thread ["+cthreadName+"] weak up.");
                        after.add(cthreadName);
                    }
                },threadName).start();
                TimeUnit.MILLISECONDS.sleep(50);
            }
    
            TimeUnit.SECONDS.sleep(1);
    
            for(int i=0;i<50;i++){
                synchronized (lock) {
                    lock.notify();
                }
                TimeUnit.MILLISECONDS.sleep(10);
            }
            TimeUnit.SECONDS.sleep(1);
    
            System.out.println(before.toString());
            System.out.println(after.toString());
        }
    }
    

    执行结果如下:

    Thread [0] wait.
    Thread [1] wait.
    Thread [2] wait.
    Thread [3] wait.
    Thread [4] wait.
    Thread [5] wait.
    Thread [6] wait.
    Thread [7] wait.
    Thread [8] wait.
    Thread [9] wait.
    Thread [10] wait.
    Thread [11] wait.
    Thread [12] wait.
    Thread [13] wait.
    Thread [14] wait.
    Thread [15] wait.
    Thread [16] wait.
    Thread [17] wait.
    Thread [18] wait.
    Thread [19] wait.
    Thread [20] wait.
    Thread [21] wait.
    Thread [22] wait.
    Thread [23] wait.
    Thread [24] wait.
    Thread [25] wait.
    Thread [26] wait.
    Thread [27] wait.
    Thread [28] wait.
    Thread [29] wait.
    Thread [30] wait.
    Thread [31] wait.
    Thread [32] wait.
    Thread [33] wait.
    Thread [34] wait.
    Thread [35] wait.
    Thread [36] wait.
    Thread [37] wait.
    Thread [38] wait.
    Thread [39] wait.
    Thread [40] wait.
    Thread [41] wait.
    Thread [42] wait.
    Thread [43] wait.
    Thread [44] wait.
    Thread [45] wait.
    Thread [46] wait.
    Thread [47] wait.
    Thread [48] wait.
    Thread [49] wait.
    Thread [0] weak up.
    Thread [1] weak up.
    Thread [2] weak up.
    Thread [3] weak up.
    Thread [4] weak up.
    Thread [5] weak up.
    Thread [6] weak up.
    Thread [7] weak up.
    Thread [8] weak up.
    Thread [9] weak up.
    Thread [10] weak up.
    Thread [11] weak up.
    Thread [12] weak up.
    Thread [13] weak up.
    Thread [14] weak up.
    Thread [15] weak up.
    Thread [16] weak up.
    Thread [17] weak up.
    Thread [18] weak up.
    Thread [19] weak up.
    Thread [20] weak up.
    Thread [21] weak up.
    Thread [22] weak up.
    Thread [23] weak up.
    Thread [24] weak up.
    Thread [25] weak up.
    Thread [26] weak up.
    Thread [27] weak up.
    Thread [28] weak up.
    Thread [29] weak up.
    Thread [30] weak up.
    Thread [31] weak up.
    Thread [32] weak up.
    Thread [33] weak up.
    Thread [34] weak up.
    Thread [35] weak up.
    Thread [36] weak up.
    Thread [37] weak up.
    Thread [38] weak up.
    Thread [39] weak up.
    Thread [40] weak up.
    Thread [41] weak up.
    Thread [42] weak up.
    Thread [43] weak up.
    Thread [44] weak up.
    Thread [45] weak up.
    Thread [46] weak up.
    Thread [47] weak up.
    Thread [48] weak up.
    Thread [49] weak up.
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
    

    这个代码线程的weak up顺序就是与wait顺序一致的。

    3.问题分析

    为什么上述两个代码的执行结果会如此不同,代码貌似都相差不多。那么问题出在哪呢?先看看各位读者能否自行发现问题所在。
    这也是我在之前为了弄这个例子所犯的问题。
    对,问题就出在这个sleep上,这个sleep是在synchronized里面还是外面sleep,区别非常大。我们先看看代码。
    实验一:

    for(int i=0;i<50;i++){
        synchronized (lock) {
            lock.notify();
            TimeUnit.MILLISECONDS.sleep(10);
        }
    }
    

    实验二:

    for(int i=0;i<50;i++){
        synchronized (lock) {
            lock.notify();
        }
        TimeUnit.MILLISECONDS.sleep(10);
    }
    

    就是上述这几行代码,可能造成的结果就不一样。为什么实验一中的结果会出现随机呢?那是因为,当我们执行notify之后,由于sleep在symchronized内部,因此没有释放锁。那么被通知到的线程,就会进入waitSet队列,之后当通知线程进入循环之后,可能会同时竞争synchronized,由于synchronized的BLOCK到RUNNING状态是非公平的。这个通知线程可能再次又获得锁,进行第二次notify。
    整个过程复盘如下:

    • 1.初始状态,用N表示通知线程,在notify没执行之前,状态如下;


      执行notify之前
    • 2.此后执行notify,假定C1被通知到,进入waitSet队列。


      执行notify
    • 3.执行完notify之后会进行sleep,此时仍然由N持有锁,在这之后,sleep结束,N将释放锁。N还会继续执行。当N再次进入循环的时候,此时,N就会进入BLOCK对synchronized的资源进行竞争。那么需要注意的是,这个时候,之前处于BLOCK状态的线程不一定就会执行,因为这是在并发条件下进行的。很大概率的情况下,都会出现同时位于BLOCK队列的情况。


      执行notify之后
    • 4.那么由于synchronized实际上不是公平锁,其锁竞争的机制具有随机性,那么此时有可能线程N再次获得锁。就是下图这种情况。


      N再次抢到锁
    • 5.线程N获得锁之后,会再次notify一个线程到WaitSet队列。


      再次调用notify
    • 6.与第三步类似,可能此时Nsleep完成之后进入了BLOCK而C1、C2也在WaitSet队列等待。


      N sleep完毕之后重新竞争
    • 7.那么此时,有一种情况就会出现,那就是C2抢到了锁。这样C2就会在C1之前被weak up。


      C2抢到锁,weakup

    通过上述分析,就很容易得出实验一种唤醒次序随机的原因了。我们再来看看实验一的weak up输出:

    [0, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 47, 46, 45, 49, 48]
    

    根据这个输出,可以看到第二位为19,也就是说,当线程编号为19的线程weakup的时候,已经调用了20次notify。
    对于实验二,则由于在每次notify之后,释放锁之后,再进入sleep,因此通知线程不会和WaitSet中的线程竞争锁。那么实验二中实际上得到的顺序,就是notify的顺序。
    通过实验二不难看出,notify实际上并不是随机的,而是顺序。

    4.HotSpot源码

    为了进一步证明上述问题,我们可以对HotSpot源码进行分析。
    hotspot源码
    点zip就能下载zip格式的源码:

    openJDK下载

    在此之后,我们知道,synchronized的wait和notify是位于ObjectMonitor.cpp中。
    我们来看看这个具体notify的方法:

    // Consider:
    // If the lock is cool (cxq == null && succ == null) and we're on an MP system
    // then instead of transferring a thread from the WaitSet to the EntryList
    // we might just dequeue a thread from the WaitSet and directly unpark() it.
    
    void ObjectMonitor::notify(TRAPS) {
      CHECK_OWNER();
      if (_WaitSet == NULL) {
         TEVENT (Empty-Notify) ;
         return ;
      }
      DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
    
      int Policy = Knob_MoveNotifyee ;
    
      Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
      //重点再这里,是调用的DequeueWaiter方法
      ObjectWaiter * iterator = DequeueWaiter() ;
      if (iterator != NULL) {
         TEVENT (Notify1 - Transfer) ;
         guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
         guarantee (iterator->_notified == 0, "invariant") ;
         if (Policy != 4) {
            iterator->TState = ObjectWaiter::TS_ENTER ;
         }
         iterator->_notified = 1 ;
         Thread * Self = THREAD;
         iterator->_notifier_tid = Self->osthread()->thread_id();
    
         ObjectWaiter * List = _EntryList ;
         if (List != NULL) {
            assert (List->_prev == NULL, "invariant") ;
            assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
            assert (List != iterator, "invariant") ;
         }
    
         if (Policy == 0) {       // prepend to EntryList
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                 List->_prev = iterator ;
                 iterator->_next = List ;
                 iterator->_prev = NULL ;
                 _EntryList = iterator ;
            }
         } else
         if (Policy == 1) {      // append to EntryList
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                // CONSIDER:  finding the tail currently requires a linear-time walk of
                // the EntryList.  We can make tail access constant-time by converting to
                // a CDLL instead of using our current DLL.
                ObjectWaiter * Tail ;
                for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
                assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
                Tail->_next = iterator ;
                iterator->_prev = Tail ;
                iterator->_next = NULL ;
            }
         } else
         if (Policy == 2) {      // prepend to cxq
             // prepend to cxq
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                iterator->TState = ObjectWaiter::TS_CXQ ;
                for (;;) {
                    ObjectWaiter * Front = _cxq ;
                    iterator->_next = Front ;
                    if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                        break ;
                    }
                }
             }
         } else
         if (Policy == 3) {      // append to cxq
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Tail ;
                Tail = _cxq ;
                if (Tail == NULL) {
                    iterator->_next = NULL ;
                    if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
                       break ;
                    }
                } else {
                    while (Tail->_next != NULL) Tail = Tail->_next ;
                    Tail->_next = iterator ;
                    iterator->_prev = Tail ;
                    iterator->_next = NULL ;
                    break ;
                }
            }
         } else {
            ParkEvent * ev = iterator->_event ;
            iterator->TState = ObjectWaiter::TS_RUN ;
            OrderAccess::fence() ;
            ev->unpark() ;
         }
    
         if (Policy < 4) {
           iterator->wait_reenter_begin(this);
         }
    
         // _WaitSetLock protects the wait queue, not the EntryList.  We could
         // move the add-to-EntryList operation, above, outside the critical section
         // protected by _WaitSetLock.  In practice that's not useful.  With the
         // exception of  wait() timeouts and interrupts the monitor owner
         // is the only thread that grabs _WaitSetLock.  There's almost no contention
         // on _WaitSetLock so it's not profitable to reduce the length of the
         // critical section.
      }
    
      Thread::SpinRelease (&_WaitSetLock) ;
    
      if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
         ObjectMonitor::_sync_Notifications->inc() ;
      }
    }
    

    notify过程调用的是DequeueWaiter方法:

    inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
      // dequeue the very first waiter
      ObjectWaiter* waiter = _WaitSet;
      if (waiter) {//判断_WaitSet是否为空,否则,将第一个元素调用Dequeue方法。
        DequeueSpecificWaiter(waiter);
      }
      return waiter;
    }
    

    实际上是将_WaitSet中的第一个元素进行出队操作。
    这也说明了notify是个顺序操作。具有公平性。

    5.总结

    经过上问两个实验分析以及查看源码可以说明:

    • 1.在HotSpot中,notify是顺序执行的,从等待队列中将队首元素出队。至于其他jvm暂时也没接触到,但是对于HotSpot确实是这样的。因此下次在有面试官问notify和notifyAll的区别的时候,希望不再是回答随机性。
    • 2.synchronized是非公平锁,本文就是最好的证明。虽然没有对源码进行分析。由于篇幅限制此处就不一一展开了。
    • 3.最终要的一点是,不要迷信权威,要敢于质疑,如果得到一个结论要反复问为什么。不行我们可以看源码。源码是不会撒谎的,我们也可以通过实验证明。

    相关文章

      网友评论

        本文标题:多线程基础(七):关于HotSpot中notify方法不具备随机

        本文链接:https://www.haomeiwen.com/subject/wcmeektx.html