LockSupport与AQS

作者: jqdywolf | 来源:发表于2018-04-17 10:06 被阅读492次

    LockSupport

    LockSupport类是Java6(JSR166-JUC)引入的一个类,提供了基本的线程同步原语。

    public class LockSupport {
        public static void park() {
            UNSAFE.park(false, 0L);
        }
        public static void unpark(Thread thread) {
            if (thread != null)
                UNSAFE.unpark(thread);
        }
    }
    

    每个线程都会有一个独有的permit(许可)。

    • unpark方法:使得所给的线程的permit置为ok。如果所给的线程还没开始,则这个方法并不保证有效。
    • park方法:如果当前线程的permit是ok的,则继续执行,否则阻塞等待,直到以下任意一件事发生为止:
      1. 其他线程调用unpark方法,传入的thread是当前线程。
      2. 其他线程interrupt当前线程
      3. 因为The call spuriously 而导致无原因返回
    • 注意:park函数并不会提供report说明是因为上述的哪种原因返回,需要Caller re-check而得知是因为哪种原因返回的。(即如果是2,则park方法也不会抛出InterruptedException异常)
    • 我们看到park和unpark调用的都是UNSAFE的park和unpark,那UNSAFE下的这些函数怎么实现的呢?其实UNSAFE包下的函数都是native的,即这些方法都是C/C++实现的,并且被编译成了DLL,由Java调用。

    Java中的native关键字

    • 使用native关键字说明这个方法是原生函数,也就是这个方法是用C/C++语言实现的,并且被编译成了DLL,由java去调用。 这些函数的实现体在DLL中,JDK的源代码中并不包含,你应该是看不到的。对于不同的平台它们也是不同的。这也是java的底层机制,实际上java就是在不同的平台上调用不同的native方法实现对操作系统的访问的。
    • native的意思就是通知操作系统, 这个函数你必须给我实现,因为我要使用。 所以native关键字的函数都是操作系统实现的, java只能调用。
    • java是跨平台的语言,既然是跨了平台,所付出的代价就是牺牲一些对底层的控制,而java要实现对底层的控制,就需要一些其他语言的帮助,这个就是native的作用了
    为何引入?

    相比较于wait/notify/notifyAll有何优点?

    • 我们必须先让wait方法运行,然后再调用notify/notifyAll方法来唤醒。
    • 而LockSupport不是,park和unpark的方法执行无需先后。
      即线程A可以先执行unpark,然后线程B执行park的时候就不会阻塞。

    注意:LockSupport是不可重入的:unpark三次之后,park一次可以继续运行,再次park还是会被阻塞。可以理解为unpark是把某个标志位标为1,并不是加1。park是将这个标志位标为0,而非减1。

    总结
    • LockSupport是Java6引入的一种同步原语,和wait/notify/notifyAll类似
    • LockSupport不限制park和unpark的调用顺序
    • LockSupport是不可重入的
    • LockSupport主要用于其他同步器的实现,比如AQS。

    AQS

    介绍

    AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),它是JUC并发包中的核心基础组件。
    AQS简单地说就是使用一个FIFO的等待队列和一个volatile int state来实现同步的。即通过CAS state判断是否被锁(CAS来保证原子性、volatile保证可见性),将阻塞的线程打包放入等待队列中。
    1. AQS的使用者一般定义一个内部类来继承AQS,使用组合的方式使用。
    2. AQS有两种模式:排他和共享。
    排他模式:只有一个线程可以拥有锁。(排他锁)
    共享模式:可以同时多个线程拥有锁。(读锁)
    AQS中两种模式下的waiting thread共用一个queue,所以一般使用者都只是使用一种模式。ReentrantReadWriteLock是同时使用了两种模式。

    定位

    使用者继承AQS,实现AQS中的几个未实现的方法。然后就可以调用AQS的方法来实现自己的接口功能了。

    1. 举个例子:ReentrantLock
    //简略代码,仅用来说明调用关系
    class ReentrantLock{
        private final Sync sync;
        static class Sync extends AbstractQueuedSynchronizer{
            protected final boolean tryRelease(int releases){/*code*/}
            protected final boolean isHeldExclusively(){/*code*/}
            protected final boolean tryAcquire(int acquires){/*code*/}
        }
        public void lock(){
            sync.acquire();//acquire方法是AQS类中实现好的函数
        }
        public void unlock() {
            sync.release(1);//release方法是AQS类中实现好的函数
        }
    }
    

    我们可以看到ReentrantLock使用一个内部类Sync来继承AQS,然后实现排他锁的三个方法。
    我们知道ReentrantLock有lock和unlock接口,可以看到这两个接口的实现就是调用AQS原有的方法。

    • 问题:为何必须要实现三个方法?
      因为AQS中这三个方法是空的,必须由使用者来定义。这么做的目的是:使用者可以自己定义state的代表含义(比如=0代表无锁,>0代表有锁),通过自定义的函数可以实现诸如是否重入、是否公平等功能。
    • 问题:我们实现了这三个方法,为何使用时却调用的是AQS原有的方法?
      因为AQS原有的方法(比如acquire、release)中调用了上述的三个方法,AQS已经帮我们实现好了逻辑。
    1. AQS未实现的方法
    //非堵塞获取独占资源,true表示成功
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    //非堵塞释放独占资源,true表示成功
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    //在排它模式下,状态是否被占用
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
    //非堵塞获取共享资源,负数表示失败,0表示成功但不需要向后传播,大于0表示成功且可以向后传播
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    //非堵塞释放共享资源,true表示成功
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    

    前三个是排他锁所要实现的,后两个是共享锁所要实现了。注意:这五个函数并不是abstract,原因是因为一般都是使用某一种模式(排他或共享模式),所以子类只需使用其中一组就可以了。

    AQS的类结构
    private transient volatile Node head;//队列头结点
    private transient volatile Node tail;//队列尾结点
    private volatile int state;//同步状态
    static final long spinForTimeoutThreshold = 1000L;//自旋最大时间
    
    1. head和tail都是lazy init,即当有第一个结点入队列的时候采取初始化。且head指向的是一个空结点,head->next指向的才是第一个结点。
    2. Node是一个等待线程的信息封装。AQS将阻塞等待的线程封装为一个内部类Node,形成队列。
    3. AQS内部线程的挂起和唤醒使用的是LockSupport的pack/unpack,所以AQS内部会有对pack之后的唤醒操作的check。
    4. 对state和队列的操作都是通过CAS来实现的。stateCAS失败意思就是获取锁失败,下面的步骤就是等待加入队列了。而入队列和出队列的操作,如果CAS失败,AQS的设计是通过自旋来一直尝试,直到达到spinForTimeoutThreshold。官方上说这种队列叫做CLH队列。
    AQS中加锁和解锁的方法

    在使用AQS的类中用来加锁和解锁的方法。

    1. 排他模式
    //获取锁,如果获取失败则加入队列
    public final void acquire(int arg);
    //获取锁,如果获取失败则加入队列;支持中断取消等待的线程
    public final void acquireInterruptibly(int arg) throws InterruptedException;
    //获取锁,如果获取失败则加入队列;带有超时时间的
    public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException;
    //释放锁
    public final boolean release(int arg);
    

    这里我们可以看到“获取锁,如果失败则加入队列”这个行为是由AQS来实现的。而如何判断失败?这个是由子类来决定的。这个决定支持了可重入性、是否公平性等功能。

    1. 共享模式
    public final void acquireShared(int arg);
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException;
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;
    public final boolean releaseShared(int arg);
    

    共享模式下的对应的四个方法。

    AQS公平性

    我们知道公平锁:先来的一定先获取锁。
    非公平锁:当多个线程在争取锁,谁先获取锁的顺序是不固定的。
    AQS的公平性是由使用者来决定的。
    我们知道AQS中的acquire函数是大致这样实现的。

     * Acquire:
     *     while (!tryAcquire(arg)) {
     *        <em>enqueue thread if it is not already queued</em>;
     *        <em>possibly block current thread</em>;
     *     }
    

    因为每次acquire的步骤是:先try再入队列。所以就可以出现这种情况:队列中有两个线程在等待,当锁被释放时,刚好又来了一个线程,则try的时候成功了,这样这个线程就获得锁了。
    如果想要实现公平锁:tryAcquire的时候判断一下,如果有线程在等待,这个函数直接返回false。
    显然非公平锁要比公平锁的效果要高。

    AQS中的Condition
    1. wait/notify/notifyAll以及Condition.await/signal/signalAll区别
      我们知道线程间的通信我们有两种方法:wait/notify/notifyAll以及Condition.await/signal/signalAll
      两者的比较(Condition的优点):
      • Condition的await方法是可以有超时参数的且可以被中断,而wait方法显然不行。
      • 多个Condition可以和一个Lock绑定。
        注意:await/signal/signalAll必须在lock和unlock代码块内使用。
    2. Condition的实现者(唯一实现者)--AQS
      而Condition的实现者(唯一实现者)就是AQS中的ConditionObject。而ConditionObject的实现也是一个等待队列来完成。await就是往队列中加入线程Node且挂起线程,signal就是从队列中唤醒一个线程。
      注意:Condition只在支持排他模式下起作用。
    3. Object.wait/Condition.await/LockSupport.park三者的关系
      Object.wait/Condition.await/LockSupport.park三者的关系是:
      Condition.await底层是LockSupport.park来实现的。
      Object.wait底层是通过关联Monitor来实现的。
    AQS中的几个使用者
    AQS的使用者们
    • ReentrantLock使用了排他模式
    • ReentrantReadWriteLock使用了两种模式。
    • Semaphore使用了共享模式:
      初始化一定数量的permit,acquire会消耗一个,release会释放一个。一般用于控制进入一块代码的最大线程数。
      举个例子:
    class Pool {
        private static final int MAX_AVAILABLE = 100;
        private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    
        public Object getItem() throws InterruptedException {
          available.acquire();
          return getNextAvailableItem();
        }
    
        public void putItem(Object x) {
          if (markAsUnused(x))
            available.release();
        }
    }
    
    • CountDownLatch使用了共享模式:
      初始化一个数量N,线程调用await方法会被阻塞,线程调用countDown方法会将N-1,当N被减为0时,所有线程同时被唤醒。有点像C++中的栅栏一样。
      常见场景:一个线程等待N个线程完成某件事件(join方法类似),或N个线程等待一个线程的某个动作(像是一个线程开枪,其他线程同时跑)。
      注意的是CountDownLatch的N不可被reset,即N减到0之后,无法再被设为N。想要使用必须重新new一个对象。
    • 与之对应的是CyclicBarrier
      初始化一个N,当一个线程执行到await时,会被阻塞。当N个线程都达到await方法时,N个线程同时被唤醒执行。
      这里的Cycle的意思是当N个线程同时达到await被唤醒后,这个CyclicBarrier对象可以继续用。
      举个例子:多个write同时都写完之后,再同时do other thing。
    public class Test {
        public static void main(String[] args) {
            int N = 4;
            CyclicBarrier barrier  = new CyclicBarrier(N);
            for(int i=0;i<N;i++)
                new Writer(barrier).start();
        }
        static class Writer extends Thread{
            private CyclicBarrier cyclicBarrier;
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
            @Override
            public void run() {
                try {
                    //write some data
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // do other thing
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:LockSupport与AQS

        本文链接:https://www.haomeiwen.com/subject/uiuckftx.html