美文网首页
AQS中LockSupport与Synchronized中par

AQS中LockSupport与Synchronized中par

作者: justonemoretry | 来源:发表于2022-05-10 09:38 被阅读0次

    1.简介

    使用LockSupport的线程会与一个许可关联,其实就像是一个二元信号量(意思就是只有一个许可证可以使用),如果这个许可没有被占用,那么当前线程可以获得许可并继续执行,如果许可以已经被占用,则当前线程就会被阻塞,然后等待许可的获取。注意:许可默认是被占用的!
    下面是释放许可-获取许可的一个例子,能正常输出b。

    public static void main(String[] args) {
            Thread thread = Thread.currentThread();
            LockSupport.unpark(thread);//释放许可
            LockSupport.park();// 获取许可
            System.out.println("b");
        }
    

    1.1 park方法分析

    除非许可证可用,否则出于线程调度目的禁用当前线程。 如果许可证可用,则该许可证被消耗,呼叫立即返回;否则,出于线程调度目的,当前线程将被禁用,并处于休眠状态,直到发生以下三种情况之一:

    • 其他线程以当前线程为目标调用 unpark
    • 其他线程中断当前线程
    • 调用不合逻辑地返回
      此方法不报告导致方法返回的原因。调用方应首先重新检查导致线程停止的条件。调用方还可以在返回时确定线程的中断状态。
     /**
         * Disables the current thread for thread scheduling purposes unless the
         * permit is available.
         *
         * <p>If the permit is available then it is consumed and the call returns
         * immediately; otherwise
         * the current thread becomes disabled for thread scheduling
         * purposes and lies dormant until one of three things happens:
         *
         * <ul>
         * <li>Some other thread invokes {@link #unpark unpark} with the
         * current thread as the target; or
         *
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread; or
         *
         * <li>The call spuriously (that is, for no reason) returns.
         * </ul>
         *
         * <p>This method does <em>not</em> report which of these caused the
         * method to return. Callers should re-check the conditions which caused
         * the thread to park in the first place. Callers may also determine,
         * for example, the interrupt status of the thread upon return.
         *
         * @param blocker the synchronization object responsible for this
         *        thread parking
         * @since 1.6
         */
        public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            // 记录当前线程阻塞的原因,lock调用的blocker是对应的同步器
            setBlocker(t, blocker);
            UNSAFE.park(false, 0L);
            // 线程唤醒后,去掉阻塞原因
            setBlocker(t, null);
        }
    

    1.2 unpark方法分析

    使给定线程的许可证可用(如果尚未可用)。如果线程在 park上被阻塞,那么它将解除阻塞。否则,它对 park 的下一次呼叫保证不会被阻塞。如果给定的线程尚未启动,则不能保证此操作有任何效果。

    2.JVM源码分析

    2.1 park方法源码解析

    在 LockSupport 的底层主要是调用 Unsafa 类的 park, unpark 方法实现如下:

    UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
      UnsafeWrapper("Unsafe_Park");
      EventThreadPark event;
    #ifndef USDT2
      HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
    #else /* USDT2 */
       HOTSPOT_THREAD_PARK_BEGIN(
                                 (uintptr_t) thread->parker(), (int) isAbsolute, time);
    #endif /* USDT2 */
      JavaThreadParkedState jtps(thread, time != 0);
      // 这里调用线程持有的parker实例中的park方法
      thread->parker()->park(isAbsolute != 0, time);
    #ifndef USDT2
      HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
    #else /* USDT2 */
      HOTSPOT_THREAD_PARK_END(
                              (uintptr_t) thread->parker());
    #endif /* USDT2 */
      if (event.should_commit()) {
        oop obj = thread->current_park_blocker();
        event.set_klass((obj != NULL) ? obj->klass() : NULL);
        event.set_timeout(time);
        event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
        event.commit();
      }
    UNSAFE_END
    
    UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
      UnsafeWrapper("Unsafe_Unpark");
      Parker* p = NULL;
      if (jthread != NULL) {
        oop java_thread = JNIHandles::resolve_non_null(jthread);
        if (java_thread != NULL) {
          jlong lp = java_lang_Thread::park_event(java_thread);
          if (lp != 0) {
            // This cast is OK even though the jlong might have been read
            // non-atomically on 32bit systems, since there, one word will
            // always be zero anyway and the value set is always the same
            p = (Parker*)addr_from_java(lp);
          } else {
            // Grab lock if apparently null or using older version of library
            MutexLocker mu(Threads_lock);
            java_thread = JNIHandles::resolve_non_null(jthread);
            if (java_thread != NULL) {
              JavaThread* thr = java_lang_Thread::thread(java_thread);
              if (thr != NULL) {
                p = thr->parker();
                if (p != NULL) { // Bind to Java thread for next time.
                  java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
                }
              }
            }
          }
        }
      }
      if (p != NULL) {
    #ifndef USDT2
        HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
    #else /* USDT2 */
        HOTSPOT_THREAD_UNPARK(
                              (uintptr_t) p);
    #endif /* USDT2 */ 调用unpark方法
        p->unpark();
      }
    UNSAFE_END
    

    每个线程对象都有一个 Parker 实例

    • Parker 类继承 os::PlatformParker,应该是一个针对不同操作系统适配的
    • 有一个 _counter 属性,可以理解为是否可以调用 park 方法的许可证,只有 _count > 0 的时候才能调用;
    • 提供了公开的 park 和 unpark 方法
    • 每个线程都有自己的parker实例,那么为什么还要加锁解锁呢,这个地方我理解是因为其它线程unpark的时候也可以操作被唤醒线程的parker实例。
    // JSR166 per-thread parker
    private:
      Parker*    _parker;
    public:
      Parker*     parker() { return _parker; }
    

    Parker类的定义

    class Parker : public os::PlatformParker {
    private:
     // 许可
      volatile int _counter ;
      Parker * FreeNext ;
      JavaThread * AssociatedWith ; // Current association
    
    public:
      Parker() : PlatformParker() {
        _counter       = 0 ;
        FreeNext       = NULL ;
        AssociatedWith = NULL ;
      }
    protected:
      ~Parker() { ShouldNotReachHere(); }
    public:
      // For simplicity of interface with Java, all forms of park (indefinite,
      // relative, and absolute) are multiplexed into one call.
      // 无限期、相对时间、绝对时间在一个方法中
      void park(bool isAbsolute, jlong time);
      void unpark();
    
      // Lifecycle operators
      static Parker * Allocate (JavaThread * t) ;
      static void Release (Parker * e) ;
    private:
      static Parker * volatile FreeList ;
      static volatile int ListLock ;
    
    };
    

    linux系统中PlatformParker的实现

    class PlatformParker : public CHeapObj<mtInternal> {
      protected:
        enum {
            REL_INDEX = 0,
            ABS_INDEX = 1
        };
        int _cur_index;  // which cond is in use: -1, 0, 1
        // 互斥量
        pthread_mutex_t _mutex [1] ;
        // 条件队列
        pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.
    
      public:       // TODO-FIXME: make dtor private
        ~PlatformParker() { guarantee (0, "invariant") ; }
    
      public:
        PlatformParker() {
          int status;
          status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
          assert_status(status == 0, status, "cond_init rel");
          status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
          assert_status(status == 0, status, "cond_init abs");
          status = pthread_mutex_init (_mutex, NULL);
          assert_status(status == 0, status, "mutex_init");
          _cur_index = -1; // mark as unused
        }
    };
    

    Parker在不同的操作系统中有不同的实现,下面是在oss_linux系统中的实现过程

    • 判断是否需要阻塞等待,如果已经是 _counter >0, 不需要等待,将 _counter = 0 , 返回
    • 如果 1 不成立,构造当前线程的 ThreadBlockInVM ,检查 _counter > 0 是否成立,成立则将 _counter 设置为 0, unlock mutex 返回;
    • 如果 2 不成立,更具需要时间进行不同的函数等待,如果等待正确返回,则将 _counter 设置为0, unlock mutex , park 调用成功。
    void Parker::park(bool isAbsolute, jlong time) {
      // Ideally we'd do something useful while spinning, such
      // as calling unpackTime().
    
      // Optional fast-path check:
      // Return immediately if a permit is available.
      // We depend on Atomic::xchg() having full barrier semantics
      // since we are doing a lock-free update to _counter.
      // 原子交换,_counter为1则直接返回,不用休眠
      if (Atomic::xchg(0, &_counter) > 0) return;
    
      Thread* thread = Thread::current();
      assert(thread->is_Java_thread(), "Must be JavaThread");
      JavaThread *jt = (JavaThread *)thread;
    
      // Optional optimization -- avoid state transitions if there's an interrupt pending.
      // Check interrupt before trying to wait
     // 线程处于中断状态,直接返回 
      if (Thread::is_interrupted(thread, false)) {
        return;
      }
    
      // Next, demultiplex/decode time arguments
      timespec absTime;
      if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
        return;
      }
      if (time > 0) {
        // 将时间转为ns形式的绝对时间存到absTime中
        unpackTime(&absTime, isAbsolute, time);
      }
    
    
      // Enter safepoint region
      // Beware of deadlocks such as 6317397.
      // The per-thread Parker:: mutex is a classic leaf-lock.
      // In particular a thread must never block on the Threads_lock while
      // holding the Parker:: mutex.  If safepoints are pending both the
      // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
      ThreadBlockInVM tbivm(jt);
    
      // Don't wait if cannot get lock since interference arises from
      // unblocking.  Also. check interrupt before trying wait
      // 再次校验线程是否中断,试着获取锁,获取锁失败退出
      // 这里锁是归属于当前线程的,获锁失败是由于unblocking操作引起的,这时候不用休眠
      // 可以去做唤醒后该做的事。
      if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
        return;
      }
      
      // 走到这说明获锁已经成功了
      int status ;
      // 许可大于0不用再等待
      if (_counter > 0)  { // no wait needed
        _counter = 0;
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
        // Paranoia to ensure our locked and lock-free paths interact
        // correctly with each other and Java-level accesses.
        // 添加内存屏障,保证_counter的修改对其它线程可见
        OrderAccess::fence();
        return;
      }
    
    #ifdef ASSERT
      // Don't catch signals while blocked; let the running threads have the signals.
      // (This allows a debugger to break into the running thread.)
      sigset_t oldsigs;
      sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
      pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
    #endif
      // 将java线程拥有的操作系统线程设置为convar_wait状态,等待
    // 某个条件发生
      OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
     // 设置_suspend_equivalent为true,不懂要干嘛
      jt->set_suspend_equivalent();
      // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
    
      assert(_cur_index == -1, "invariant");
      if (time == 0) {
        _cur_index = REL_INDEX; // arbitrary choice when not timed
        // 没有时间设置代表永久休眠,条件队列wait操作会先释放互斥锁再休眠,
    // 线程被唤醒时,要先拿到互斥锁才能向下执行,status代表获锁状态
        status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
      } else {
        _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
       // 带超时时间的cond_wait,到了超时时间条件没有发生也会唤醒
        status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
        // 失败的情况下,删除对应的条件队列,重新初始化
        if (status != 0 && WorkAroundNPTLTimedWaitHang) {
          pthread_cond_destroy (&_cond[_cur_index]) ;
          pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
        }
      }
      _cur_index = -1;
      assert_status(status == 0 || status == EINTR ||
                    status == ETIME || status == ETIMEDOUT,
                    status, "cond_timedwait");
    
    #ifdef ASSERT
      pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
    #endif
     
     // 被唤醒后,许可被消耗,释放互斥锁
      _counter = 0 ;
      status = pthread_mutex_unlock(_mutex) ;
      assert_status(status == 0, status, "invariant") ;
      // Paranoia to ensure our locked and lock-free paths interact
      // correctly with each other and Java-level accesses.
      // 这里插入内存屏障,确保_counter的写入操作对其它线程可见
       OrderAccess::fence();
    
      // 如果等待的时候有外部挂起,重新挂起
      // If externally suspended while waiting, re-suspend
      if (jt->handle_special_suspend_equivalent_condition()) {
        jt->java_suspend_self();
      }
    }
    

    从park的实现可以看到:

    1. 无论是什么情况返回,park方法本身都不会告知调用方返回的原因,所以调用的时候一般都会去判断返回的场景,根据场景做不同的处理
    2. 线程的等待与挂起、唤醒等等就是使用的POSIX的线程API
    3. park的许可通过原子变量_count实现,当被消耗时,_count为0,只要拥有许可,就会立即返回
      order fence在linux中的实现
     
    inline void OrderAccess::fence() {
      if (os::is_MP()) {
    #ifdef AMD64
      // 没有使用mfence,因为mfence有时候性能差于使用 locked addl
        __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");
    #else    __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory");
    #endif  }
    }
    

    2.2 unpark源码

    unpark的过程就比较简单了,如下:

    void Parker::unpark() {
      int s, status ;
      // 互斥变量加锁
      status = pthread_mutex_lock(_mutex);
      assert (status == 0, "invariant") ;
      s = _counter;
      // 发放许可
      _counter = 1;
      // 小于1代表许可被消耗,线程可能被park
      if (s < 1) {
        // thread might be parked
        if (_cur_index != -1) {
          // thread is definitely parked
         // 如果之前_counter小于1,那么根据_cur_index来判断是否线程已经阻塞了。
    // 如果阻塞根据WorkAroundNPTLTimedWaitHang来选择调用pthread_cond_signal(唤醒线程)和pthread_mutex_unlock(释放_mutex锁)的顺序,
    // 当然为了避免NPTL-FUTEX 阻塞在 pthread_cond_timedwait上,这里默认开启为1。
          if (WorkAroundNPTLTimedWaitHang) {
            // 先唤醒等待线程,再释放互斥锁,防止线程超时醒来,直接抢锁,和唤醒线程的操作有竞争
            status = pthread_cond_signal (&_cond[_cur_index]);
            assert (status == 0, "invariant");
            status = pthread_mutex_unlock(_mutex);
            assert (status == 0, "invariant");
          } else {
            status = pthread_mutex_unlock(_mutex);
            assert (status == 0, "invariant");
            status = pthread_cond_signal (&_cond[_cur_index]);
            assert (status == 0, "invariant");
          }
        } else {
          pthread_mutex_unlock(_mutex);
          assert (status == 0, "invariant") ;
        }
      } else {
       // 许可还在,说明没有线程挂起,直接解锁
        pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
      }
    }
    

    3 synchronized中park的实现

    synchronized中是靠parkEvent类来实现park和unpark方法来实现。parkEvent类的定义,以及获取释放方式如下,parkEvent和parker不同,parker是线程直接持有,parkEvent是在一个列表中去查找,查找首个节点,为空闲则和当前线程绑定,不为空则新建,使用完后再归还到链表中:

    ParkEvent * ParkEvent::Allocate (Thread * t) {
      ParkEvent * ev ;
     
      //获取锁
      Thread::SpinAcquire(&ListLock, "ParkEventFreeListAllocate");
      {
        //将FreeList链表头从链表中移除
        ev = FreeList;
        if (ev != NULL) {
          FreeList = ev->FreeNext;
        }
      }
      //释放锁
      Thread::SpinRelease(&ListLock);
     
      if (ev != NULL) {
        //如果找到一个空闲的ParkEvent
        guarantee (ev->AssociatedWith == NULL, "invariant") ;
      } else {
        //如果没有找到,则创建一个
        ev = new ParkEvent () ;
        guarantee ((intptr_t(ev) & 0xFF) == 0, "invariant") ;
      }
      //将_Event置为0
      ev->reset() ;                     // courtesy to caller
      //保存关联的线程
      ev->AssociatedWith = t ;          // Associate ev with t
      ev->FreeNext       = NULL ;
      return ev ;
    }
     
    void ParkEvent::Release (ParkEvent * ev) {
      if (ev == NULL) return ;
      guarantee (ev->FreeNext == NULL      , "invariant") ;
      ev->AssociatedWith = NULL ;
      //获取锁
      Thread::SpinAcquire(&ListLock, "ParkEventFreeListRelease");
      {  
        //归还到FreeList链表中
        ev->FreeNext = FreeList;
        FreeList = ev;
      }
      //释放锁
      Thread::SpinRelease(&ListLock);
    }
     
    ParkEvent() : PlatformEvent() {
           AssociatedWith = NULL ;
           FreeNext       = NULL ;
           ListNext       = NULL ;
           ListPrev       = NULL ;
           OnList         = 0 ;
           TState         = 0 ;
           Notified       = 0 ;
           IsWaiting      = 0 ;
        }
     
    PlatformEvent() {
          int status;
          //初始化_cond和_mutex
          status = pthread_cond_init (_cond, os::Linux::condAttr());
          assert_status(status == 0, status, "cond_init");
          status = pthread_mutex_init (_mutex, NULL);
          assert_status(status == 0, status, "mutex_init");
          _Event   = 0 ;
          _nParked = 0 ;
          _Assoc   = NULL ;
        }
     
     void reset() { _Event = 0 ; }
    

    park方法

    int os::PlatformEvent::TryPark() {
      for (;;) {
        const int v = _Event ;
        //_Event只能是0或者1
        guarantee ((v == 0) || (v == 1), "invariant") ;
        //将_Event原子的置为0
        if (Atomic::cmpxchg (0, &_Event, v) == v) return v  ;
      }
    }
     
    void os::PlatformEvent::park() {       // AKA "down()"
      int v ;
      for (;;) {
          v = _Event ;
          //将其原子的减1
          if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
      }
      guarantee (v >= 0, "invariant") ;
      if (v == 0) {
         //获取锁
         int status = pthread_mutex_lock(_mutex);
         assert_status(status == 0, status, "mutex_lock");
         guarantee (_nParked == 0, "invariant") ;
         //已park线程计数加1
         ++ _nParked ;
         //_Event已经原子的减1,变成-1了
         while (_Event < 0) {
            //无期限等待
            status = pthread_cond_wait(_cond, _mutex);
            if (status == ETIME) { status = EINTR; }
            assert_status(status == 0 || status == EINTR, status, "cond_wait");
         }
         //被唤醒了
         //计数减1
         -- _nParked ;
        //重置成0
        _Event = 0 ;
        //释放锁
         status = pthread_mutex_unlock(_mutex);
         assert_status(status == 0, status, "mutex_unlock");
        //让修改立即生效
        OrderAccess::fence();
      }
      guarantee (_Event >= 0, "invariant") ;
    }
     
    int os::PlatformEvent::park(jlong millis) {
      guarantee (_nParked == 0, "invariant") ;
     
      int v ;
      for (;;) {
          v = _Event ;
          //将其原子的减1
          if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
      }
      guarantee (v >= 0, "invariant") ;
      if (v != 0) return OS_OK ;
     
      //计算等待的时间
      struct timespec abst;
      compute_abstime(&abst, millis);
     
      int ret = OS_TIMEOUT;
      //获取锁
      int status = pthread_mutex_lock(_mutex);
      assert_status(status == 0, status, "mutex_lock");
      guarantee (_nParked == 0, "invariant") ;
      //计数加1
      ++_nParked ;
     
      while (_Event < 0) {
        //让线程休眠,底层是pthread_cond_timedwait
        status = os::Linux::safe_cond_timedwait(_cond, _mutex, &abst);
        //WorkAroundNPTLTimedWaitHang的值默认是1
        if (status != 0 && WorkAroundNPTLTimedWaitHang) {
          pthread_cond_destroy (_cond);
          pthread_cond_init (_cond, os::Linux::condAttr()) ;
        }
        //被中断后就返回EINTR,正常被唤醒就返回0,另外两个是等待超时
        assert_status(status == 0 || status == EINTR ||
                      status == ETIME || status == ETIMEDOUT,
                      status, "cond_timedwait");
        //FilterSpuriousWakeups默认是true              
        if (!FilterSpuriousWakeups) break ;                 // previous semantics
        //如果超时了则退出循环
        if (status == ETIME || status == ETIMEDOUT) break ;
      }
      --_nParked ;
      if (_Event >= 0) {
         ret = OS_OK;
      }
      _Event = 0 ;
      //解锁
      status = pthread_mutex_unlock(_mutex);
      assert_status(status == 0, status, "mutex_unlock");
      assert (_nParked == 0, "invariant") ;
      //让修改立即生效
      OrderAccess::fence();
      return ret;
    }
    

    unpark方法
    unpark用于唤醒某个被park方法阻塞的线程,其实现如下:

    void os::PlatformEvent::unpark() {
      //将其原子的置为1,如果原来就是1,说明已经unpark过了,直接返回
      if (Atomic::xchg(1, &_Event) >= 0) return;
     
      //获取锁
      int status = pthread_mutex_lock(_mutex);
      assert_status(status == 0, status, "mutex_lock");
      int AnyWaiters = _nParked;
      assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant");
      //WorkAroundNPTLTimedWaitHang默认是true
      if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) {
        AnyWaiters = 0;
        //发信号唤醒该线程,被唤醒后将_nParked置为0
        pthread_cond_signal(_cond);
      }
      //释放锁
      status = pthread_mutex_unlock(_mutex);
      assert_status(status == 0, status, "mutex_unlock");
      if (AnyWaiters != 0) {
        //pthread_cond_signal不要求获取锁,此处再次唤醒
        status = pthread_cond_signal(_cond);
        assert_status(status == 0, status, "cond_signal");
      }
    }
    

    4 总结

    lockSupport中park和unpark都是依赖于Parker类来实现,linux中parker类的实现主要是维护了一个许可_counter变量,利用posix(Portable Operating System Interface of UNIX)thread api去实现线程的休眠与唤醒。
    Synchronized中依赖于ParkerEvent类,其中linux的实现,主要是维护_Event的值,然后同样是用pthread_mutex_lock进行加锁,pthread_cond_wait进行线程休眠,按注释说,后续会用parkerEvent替代parker类。

    参考链接

    LockSupport 以及 park、unpark 方法源码分析
    LockSupport的park和unpark的原理
    Hotspot Parker和ParkEvent 源码解析

    相关文章

      网友评论

          本文标题:AQS中LockSupport与Synchronized中par

          本文链接:https://www.haomeiwen.com/subject/kztkurtx.html