美文网首页android开发专题
深入理解Java中的底层阻塞原理及实现

深入理解Java中的底层阻塞原理及实现

作者: Java大生 | 来源:发表于2019-03-02 11:31 被阅读53次

    谈到阻塞,相信大家都不会陌生了。阻塞的应用场景真的多得不要不要的,比如 生产-消费模式,限流统计等等。什么 ArrayBlockingQueue、 LinkedBlockingQueue、DelayQueue 等等,都是阻塞队列的实现啊,多简单!

    阻塞,一般有两个特性很亮眼:1. 不耗 CPU 等待;2. 线程安全;

    额,要这么说也 OK 的。毕竟,我们遇到的问题,到这里就够解决了。但是有没有想过,这容器的阻塞又是如何实现的呢?

    好吧,翻开源码,也很简单了:(比如 ArrayBlockingQueue 的 take、put….)

    // ArrayBlockingQueue

    /**

     * Inserts the specified element at the tail of this queue, waiting

     * for space to become available if the queue is full.

     *

     * @throws InterruptedException {@inheritDoc}

     * @throws NullPointerException {@inheritDoc}

     */

    publicvoidput(E e)throwsInterruptedException {

        checkNotNull(e);

    finalReentrantLock lock =this.lock;

        lock.lockInterruptibly();

    try{

    while(count == items.length)

                // 阻塞的点

                notFull.await();

            enqueue(e);

    }finally{

            lock.unlock();

        }

    }

    /**

     * Inserts the specified element at the tail of this queue, waiting

     * up to the specified wait time for space to become available if

     * the queue is full.

     *

     * @throws InterruptedException {@inheritDoc}

     * @throws NullPointerException {@inheritDoc}

     */

    publicbooleanoffer(E e,longtimeout, TimeUnit unit)

    throwsInterruptedException {

        checkNotNull(e);

    longnanos = unit.toNanos(timeout);

    finalReentrantLock lock =this.lock;

        lock.lockInterruptibly();

    try{

    while(count == items.length) {

    if(nanos <= 0)

    returnfalse;

                // 阻塞的点

                nanos = notFull.awaitNanos(nanos);

            }

            enqueue(e);

    returntrue;

    }finally{

            lock.unlock();

        }

    }

    publicE take()throwsInterruptedException {

    finalReentrantLock lock =this.lock;

        lock.lockInterruptibly();

    try{

    while(count == 0)

                // 阻塞的点

                notEmpty.await();

    returndequeue();

    }finally{

            lock.unlock();

        }

    }

    看来,最终都是依赖了AbstractQueuedSynchronizer类(著名的AQS)的await方法,看起来像那么回事。那么这个同步器的阻塞又是如何实现的呢?

    Java的代码总是好跟踪的:

    // AbstractQueuedSynchronizer.await()

    /**

     * Implements interruptible condition wait.

     * <ol>

     * <li> If current thread is interrupted, throw InterruptedException.

     * <li> Save lock state returned by {@link #getState}.

     * <li> Invoke {@link #release} with saved state as argument,

     *      throwing IllegalMonitorStateException if it fails.

     * <li> Block until signalled or interrupted.

     * <li> Reacquire by invoking specialized version of

     *      {@link #acquire} with saved state as argument.

     * <li> If interrupted while blocked in step 4, throw InterruptedException.

     * </ol>

     */

    publicfinalvoidawait()throwsInterruptedException {

    if(Thread.interrupted())

    thrownewInterruptedException();

        Node node = addConditionWaiter();

    intsavedState = fullyRelease(node);

    intinterruptMode = 0;

    while(!isOnSyncQueue(node)) {

            // 此处进行真正的阻塞

    LockSupport.park(this);

    if((interruptMode = checkInterruptWhileWaiting(node)) != 0)

    break;

        }

    if(acquireQueued(node, savedState) && interruptMode != THROW_IE)

            interruptMode = REINTERRUPT;

    if(node.nextWaiter !=null) // clean up if cancelled

            unlinkCancelledWaiters();

    if(interruptMode != 0)

            reportInterruptAfterWait(interruptMode);

    }

    如上,可以看到,真正的阻塞工作又转交给了另一个工具类:LockSupportpark方法了,这回跟锁扯上了关系,看起来已经越来越接近事实了:

    // LockSupport.park()

    /**

     * Disables the current thread for thread scheduling purposes unless the

     * permit is available.

     *

     * <p>If the permit is available then it is consumed and the call returns

     * immediately; otherwise

     * the current thread becomes disabled for thread scheduling

     * purposes and lies dormant until one of three things happens:

     *

     * <ul>

     * <li>Some other thread invokes {@link #unpark unpark} with the

     * current thread as the target; or

     *

     * <li>Some other thread {@linkplain Thread#interrupt interrupts}

     * the current thread; or

     *

     * <li>The call spuriously (that is, for no reason) returns.

     * </ul>

     *

     * <p>This method does <em>not</em> report which of these caused the

     * method to return. Callers should re-check the conditions which caused

     * the thread to park in the first place. Callers may also determine,

     * for example, the interrupt status of the thread upon return.

     *

     * @param blocker the synchronization object responsible for this

     *        thread parking

     * @since 1.6

     */

    publicstaticvoidpark(Object blocker) {

        Thread t = Thread.currentThread();

        setBlocker(t, blocker);

    UNSAFE.park(false, 0L);

    setBlocker(t,null);

    }

    看得出来,这里的实现就比较简洁了,先获取当前线程,设置阻塞对象,阻塞,然后解除阻塞。

    好吧,到底什么是真正的阻塞,我们还是不得而知!

    UNSAFE.park(false, 0L);是个什么东西? 看起来就是这一句起到了最关键的作用呢!但由于这里已经是 native 代码,我们已经无法再简单的查看源码了!那咋整呢?

    那不行就看C/C++的源码呗,看一下 parker 的定义(park.hpp):

    classParker :publicos::PlatformParker {

    private:

    volatileint_counter ;

      Parker * FreeNext ;

      JavaThread * AssociatedWith ; // Current association

    public:

      Parker() : PlatformParker() {

        _counter       = 0 ;

        FreeNext       = NULL ;

        AssociatedWith = NULL ;

      }

    protected:

      ~Parker() { ShouldNotReachHere(); }

    public:

      // For simplicity of interface with Java, all forms of park (indefinite,

      // relative, and absolute) are multiplexed into one call.  c中暴露出两个方法给java调用

    voidpark(boolisAbsolute, jlongtime);

    voidunpark();

      // Lifecycle operators

    staticParker * Allocate (JavaThread * t) ;

    staticvoidRelease (Parker * e) ;

    private:

    staticParker *volatileFreeList ;

    staticvolatileintListLock ;

    };

    park()方法到底是如何实现的呢? 其实是继承的os::PlatformParker的功能,也就是平台相关的私有实现,以 Linux 平台实现为例(os_linux.hpp):

    // Linux中的parker定义

    classPlatformParker :publicCHeapObj {

    protected:

    enum{

            REL_INDEX = 0,

            ABS_INDEX = 1

        };

    int_cur_index;  // which cond is in use: -1, 0, 1

        pthread_mutex_t _mutex [1] ;

        pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.

    public:       // TODO-FIXME: make dtor private

        ~PlatformParker() { guarantee (0, "invariant") ; }

    public:

        PlatformParker() {

    intstatus;

          status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());

          assert_status(status == 0, status, "cond_init rel");

          status = pthread_cond_init (&_cond[ABS_INDEX], NULL);

          assert_status(status == 0, status, "cond_init abs");

          status = pthread_mutex_init (_mutex, NULL);

          assert_status(status == 0, status, "mutex_init");

          _cur_index = -1; // mark as unused

        }

    };

    看到 park.cpp 中没有重写 park() 和 unpark() 方法,也就是说阻塞实现完全交由特定平台代码处理了(os_linux.cpp):

    // park方法的实现,依赖于 _counter, _mutex[1], _cond[2]

    voidParker::park(boolisAbsolute, jlongtime) {

      // Ideally we'd do something useful while spinning, such

      // as calling unpackTime().

      // Optional fast-path check:

      // Return immediately if a permit is available.

      // We depend on Atomic::xchg() having full barrier semantics

      // since we are doing a lock-free update to _counter.

    if(Atomic::xchg(0, &_counter) > 0)return;

    Thread*thread= Thread::current();

    assert(thread->is_Java_thread(), "Must be JavaThread");

    JavaThread *jt = (JavaThread *)thread;

      // Optional optimization -- avoid state transitions if there's an interrupt pending.

      // Check interrupt before trying to wait

    if(Thread::is_interrupted(thread,false)) {

    return;

      }

      // Next, demultiplex/decode time arguments

      timespec absTime;

    if(time< 0 || (isAbsolute &&time== 0) ) { // don't wait at all

    return;

      }

    if(time> 0) {

    unpackTime(&absTime, isAbsolute,time);

      }

      // Enter safepoint region

      // Beware of deadlocks such as 6317397.

      // The per-thread Parker:: mutex is a classic leaf-lock.

      // In particular a thread must never block on the Threads_lock while

      // holding the Parker:: mutex.  If safepoints are pending both the

      // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.

      ThreadBlockInVM tbivm(jt);

      // Don't wait if cannot get lock since interference arises from

      // unblocking.  Also. check interrupt before trying wait

    if(Thread::is_interrupted(thread,false) || pthread_mutex_trylock(_mutex) != 0) {

    return;

      }

    intstatus ;

    if(_counter > 0)  { // no wait needed

        _counter = 0;

        status = pthread_mutex_unlock(_mutex);

    assert(status == 0, "invariant") ;

        // Paranoia to ensure our locked and lock-free paths interact

        // correctly with each other and Java-level accesses.

        OrderAccess::fence();

    return;

      }

    #ifdef ASSERT

      // Don't catch signals while blocked; let the running threads have the signals.

      // (This allows a debugger to break into the running thread.)

      sigset_t oldsigs;

      sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();

      pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);

    #endif

    OSThreadWaitState osts(thread->osthread(),false/* not Object.wait() */);

      jt->set_suspend_equivalent();

      // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

    assert(_cur_index == -1, "invariant");

    if(time== 0) {

        _cur_index = REL_INDEX; // arbitrary choice when not timed

        status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;

    }else{

        _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;

        status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;

    if(status != 0 && WorkAroundNPTLTimedWaitHang) {

          pthread_cond_destroy (&_cond[_cur_index]) ;

          pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());

        }

      }

      _cur_index = -1;

      assert_status(status == 0 || status == EINTR ||

                    status == ETIME || status == ETIMEDOUT,

                    status, "cond_timedwait");

    #ifdef ASSERT

      pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);

    #endif

      _counter = 0 ;

      status = pthread_mutex_unlock(_mutex) ;

      assert_status(status == 0, status, "invariant") ;

      // Paranoia to ensure our locked and lock-free paths interact

      // correctly with each other and Java-level accesses.

      OrderAccess::fence();

      // If externally suspended while waiting, re-suspend

    if(jt->handle_special_suspend_equivalent_condition()) {

        jt->java_suspend_self();

      }

    }

    // unpark 实现,相对简单些

    voidParker::unpark() {

    ints, status ;

      status = pthread_mutex_lock(_mutex);

    assert(status == 0, "invariant") ;

      s = _counter;

      _counter = 1;

    if(s < 1) {

        // thread might be parked

    if(_cur_index != -1) {

          // thread is definitely parked

    if(WorkAroundNPTLTimedWaitHang) {

            status = pthread_cond_signal (&_cond[_cur_index]);

    assert(status == 0, "invariant");

            status = pthread_mutex_unlock(_mutex);

    assert(status == 0, "invariant");

    }else{

            // must capture correct index before unlocking

    intindex = _cur_index;

            status = pthread_mutex_unlock(_mutex);

    assert(status == 0, "invariant");

            status = pthread_cond_signal (&_cond[index]);

    assert(status == 0, "invariant");

          }

    }else{

          pthread_mutex_unlock(_mutex);

    assert(status == 0, "invariant") ;

        }

    }else{

        pthread_mutex_unlock(_mutex);

    assert(status == 0, "invariant") ;

      }

    }

    从上面代码可以看出,阻塞主要借助于三个变量,_cond、_mutex、_counter, 调用 Linux 系统的pthread_cond_wait、pthread_mutex_lock、pthread_mutex_unlock(一组 POSIX 标准的阻塞接口)等平台相关的方法进行阻塞了!

    而 park.cpp 中,则只有  Allocate、Release 等的一些常规操作!

    // 6399321 As a temporary measure we copied & modified the ParkEvent::

    // allocate() and release() code for use by Parkers.  The Parker:: forms

    // will eventually be removed as we consolide and shift over to ParkEvents

    // for both builtin synchronization and JSR166 operations.

    volatileintParker::ListLock = 0 ;

    Parker *volatileParker::FreeList = NULL ;

    Parker * Parker::Allocate (JavaThread * t) {

      guarantee (t != NULL, "invariant") ;

      Parker * p ;

      // Start by trying to recycle an existing but unassociated

      // Parker from the global free list.

      // 8028280: using concurrent free list without memory management can leak

      // pretty badly it turns out.

      Thread::SpinAcquire(&ListLock, "ParkerFreeListAllocate");

      {

        p = FreeList;

    if(p != NULL) {

          FreeList = p->FreeNext;

        }

      }

      Thread::SpinRelease(&ListLock);

    if(p != NULL) {

        guarantee (p->AssociatedWith == NULL, "invariant") ;

    }else{

        // Do this the hard way -- materialize a new Parker..

    p =newParker() ;

      }

      p->AssociatedWith = t ;          // Associate p with t

      p->FreeNext       = NULL ;

    returnp ;

    }

    voidParker::Release (Parker * p) {

    if(p == NULL)return;

      guarantee (p->AssociatedWith != NULL, "invariant") ;

      guarantee (p->FreeNext == NULL      , "invariant") ;

      p->AssociatedWith = NULL ;

      Thread::SpinAcquire(&ListLock, "ParkerFreeListRelease");

      {

        p->FreeNext = FreeList;

        FreeList = p;

      }

      Thread::SpinRelease(&ListLock);

    }

    综上源码,在进行阻塞的时候,底层并没有(并不一定)要用 while 死循环来阻塞,更多的是借助于操作系统的实现来进行阻塞的。当然,这也更符合大家的猜想!

    从上的代码我们也发现一点,底层在做许多事的时候,都不忘考虑线程中断,也就是说,即使在阻塞状态也是可以接收中断信号的,这为上层语言打开了方便之门。

    如果要细说阻塞,其实还远没完,不过再往操作系统层面如何实现,就得再下点功夫,去翻翻资料了,把底线压在操作系统层面,大多数情况下也够用了!

    欢迎学Java和大数据的朋友们加入java架构交流: 855835163

    加群链接:https://jq.qq.com/?_wv=1027&amp;k=5dPqXGI

    群内提供免费的架构资料还有:Java工程化、高性能及分布式、高性能、深入浅出。高架构。性能调优、Spring,MyBatis,Netty源码分析和大数据等多个知识点高级进阶干货的免费直播讲解  可以进来一起学习交流哦

    相关文章

      网友评论

        本文标题:深入理解Java中的底层阻塞原理及实现

        本文链接:https://www.haomeiwen.com/subject/gqreuqtx.html