美文网首页
JAVA多线程(二)——详解 AbstractQueuedSyn

JAVA多线程(二)——详解 AbstractQueuedSyn

作者: 云中人山 | 来源:发表于2018-08-26 14:37 被阅读10次

    本文为他人博客,转自:详解 AbstractQueuedSynchronizer 由于某种神秘力量,很多人不能访问,故直接搬运

    前言

    队列同步器 AbstractQueuedSynchronizer(以下简称 AQS),是用来构建锁或者其他同步组件的基础框架。它使用一个 int 成员变量来表示同步状态,通过 CAS 操作对同步状态进行修改,确保状态的改变是安全的。通过内置的 FIFO (First In First Out)队列来完成资源获取线程的排队工作。更多关于 Java 多线程的文章可以转到 这里

    AQS 和 synchronized

    在介绍 AQS 的使用之前,需要首先说明一点,AQS 同步和 synchronized 关键字同步(以下简称 synchronized 同步)是采用的两种不同的机制。首先看下 synchronized 同步,synchronized 关键字经过编译之后,会在同步块的前后分别形成 monitorenter 和 monitorexit 这两个字节码指令,这两个字节码需要关联到一个监视对象,当线程执行 monitorenter 指令时,需要首先获得获得监视对象的锁,这里监视对象锁就是进入同步块的凭证,只有获得了凭证才可以进入同步块,当线程离开同步块时,会执行 monitorexit 指令,释放对象锁。

    在 AQS 同步中,使用一个 int 类型的变量 state 来表示当前同步块的状态。以独占式同步(一次只能有一个线程进入同步块)为例,state 的有效值有两个 0 和 1,其中 0 表示当前同步块中没有线程,1 表示同步块中已经有线程在执行。当线程要进入同步块时,需要首先判断 state 的值是否为 0,假设为 0,会尝试将 state 修改为 1,只有修改成功了之后,线程才可以进入同步块。注意上面提到的两个条件:

    • state 为 0,证明当前同步块中没有线程在执行,所以当前线程可以尝试获得进入同步块的凭证,而这里的凭证就是是否成功将 state 修改为 1(在 synchronized 同步中,我们说的凭证是对象锁,但是对象锁的最终实现是否和这种方式类似,没有找到相关的资料)
    • 成功将 state 修改为 1,通过使用 CAS 操作,我们可以确保即便有多个线程同时修改 state,也只有一个线程会修改成功。关于 CAS 的具体解释会在后面提到。

    当线程离开同步块时,会修改 state 的值,将其设为 0,并唤醒等待的线程。所以在 AQS 同步中,我们说线程获得了锁,实际上是指线程成功修改了状态变量 state,而线程释放了锁,是指线程将状态变量置为了可修改的状态(在独占式同步中就是置为了 0),让其他线程可以再次尝试修改状态变量。在下面的表述中,我们说线程获得和释放了锁,就是上述含义, 这与 synchronized 同步中说的获得和释放锁的含义不同,需要区别理解。

    基本使用

    本节摘自 Java 并发编程的艺术

    AQS 的设计是基于模板方法的,使用者需要继承 AQS 并重写指定的方法。在后续的流程中,AQS 提供的模板方法会调用重写的方法。一般来说,我们需要重写的方法主要有下面 5 个:

    方法名称 描述
    protected boolean tryAcquire(int) 独占式获取锁,实现该方法需要查询当前状态并判断同步状态是否和预期值相同,然后使用 CAS 操作设置同步状态
    protected boolean tryRelease(int) 独占式释放锁,实际也是修改同步变量
    protected int tryAcquireShared(int) 共享式获取锁,返回大于等于 0 的值,表示获取锁成功,反之获取失败
    protected boolean tryReleaseShared(int) 共享式释放锁
    protected boolean isHeldExclusively() 判断调用该方法的线程是否持有互斥锁

    在自定义的同步组件中,我们一般会调用 AQS 提供的模板方法。AQS 提供的模板方法基本上分为 3 类: 独占式获取与释放锁、共享式获取与释放锁以及查询同步队列中的等待线程情况。下面是相关的模板方法:

    方法名称 描述
    void acquire(int) 独占式获取锁,如果当前线程成功获取锁,那么方法就返回,否则会将当前线程放入同步队列等待。该方法会调用重写的 tryAcquire(int arg) 方法判断是否可以获得锁
    void acquireInterruptibly(int) 和 acquire(int) 相同,但是该方法响应中断,当线程在同步队列中等待时,如果线程被中断,会抛出 InterruptedException 异常并返回。
    boolean tryAcquireNanos(int, long) 在 acquireInterruptibly(int) 基础上添加了超时控制,同时支持中断和超时,当在指定时间内没有获得锁时,会返回 false,获取到了返回 true
    void acquireShared(int) 共享式获得锁,如果成功获得锁就返回,否则将当前线程放入同步队列等待,与独占式获取锁的不同是,同一时刻可以有多个线程获得共享锁,该方法调用 tryAcquireShared(int)
    acquireSharedInterruptibly(int) 与 acquireShared(int) 相同,该方法响应中断
    tryAcquireSharedNanos(int, long) 在 acquireSharedInterruptibly(int) 基础上添加了超时控制
    boolean release(int) 独占式释放锁,该方法会在释放锁后,将同步队列中第一个等待节点唤醒
    boolean releaseShared(int) 共享式释放锁
    Collection<thread> getQueuedThreads()</thread> 获得同步队列中等待的线程集合

    自定义组件通过使用同步器提供的模板方法来实现自己的同步语义。下面我们通过两个示例,看下如何借助于 AQS 来实现锁的同步语义。我们首先实现一个独占锁(排它锁),独占锁就是说在某个时刻内,只能有一个线程持有独占锁,只有持有锁的线程释放了独占锁,其他线程才可以获取独占锁。下面是具体实现:

    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    /**
     * Created by Jikai Zhang on 2017/4/9.
     * <p>
     * 自定义共享锁
     */
    public class TwinsLock implements Lock {
        private static class Sync extends AbstractQueuedSynchronizer {
            public Sync(int resourceCount) {
                if (resourceCount <= 0) {
                    throw new IllegalArgumentException("resourceCount must be larger than zero.");
                }
                // 设置可以共享的资源总数
                setState(resourceCount);
            }
            @Override
            protected int tryAcquireShared(int reduceCount) {
                // 使用尝试获得资源,如果成功修改了状态变量(获得了资源)
                // 或者资源的总量小于 0(没有资源了),则返回。
                for (; ; ) {
                    int lastCount = getState();
                    int newCount = lastCount - reduceCount;
                    if (newCount < 0 || compareAndSetState(lastCount, newCount)) {
                        return newCount;
                    }
                }
            }
            @Override
            protected boolean tryReleaseShared(int returnCount) {
                // 释放共享资源,因为可能有多个线程同时执行,所以需要使用 CAS 操作来修改资源总数。
                for (; ; ) {
                    int lastCount = getState();
                    int newCount = lastCount + returnCount;
                    if (compareAndSetState(lastCount, newCount)) {
                        return true;
                    }
                }
            }
        }
        // 定义两个共享资源,说明同一时间内可以有两个线程同时运行
        private final Sync sync = new Sync(2);
        @Override
        public void lock() {
            sync.acquireShared(1);
        }
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
        @Override
        public boolean tryLock() {
            return sync.tryAcquireShared(1) >= 0;
        }
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(time));
        }
        @Override
        public void unlock() {
            sync.releaseShared(1);
        }
        @Override
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
        public static void main(String[] args) {
            final Lock lock = new TwinsLock();
            int threadCounts = 10;
            Thread threads[] = new Thread[threadCounts];
            for (int i = 0; i < threadCounts; i++) {
                final int index = i;
                threads[i] = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < 5; i++) {
                            lock.lock();
                            try {
                                TimeUnit.SECONDS.sleep(1);
                                System.out.println(Thread.currentThread().getName());
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } finally {
                                lock.unlock();
                            }
                            try {
                                TimeUnit.SECONDS.sleep(1);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
            }
            for (int i = 0; i < threadCounts; i++) {
                threads[i].start();
            }
        }
    }
    

    程序的运行结果如下面所示。我们看到使用了 Mutex 之后,线程 0 和线程 1 不会再交替执行,而是当一个线程执行完,另外一个线程再执行。

    Without mutex:
    Thread-0: j =0
    Thread-1: j =0
    Thread-0: j =20000
    Thread-1: j =20000
    Thread-0: j =40000
    Thread-1: j =40000
    Thread-0: j =60000
    Thread-1: j =60000
    Thread-1: j =80000
    Thread-0: j =80000
    With mutex:
    Thread-0: j =0
    Thread-0: j =20000
    Thread-0: j =40000
    Thread-0: j =60000
    Thread-0: j =80000
    Thread-1: j =0
    Thread-1: j =20000
    Thread-1: j =40000
    Thread-1: j =60000
    Thread-1: j =80000
    
    

    下面在看一个共享锁的示例。在该示例中,我们定义两个共享资源,即同一时间内允许两个线程同时执行。我们将同步变量的初始状态 state 设为 2,当一个线程获取了共享锁之后,将 state 减 1,线程释放了共享锁后,将 state 加 1。状态的合法范围是 0、1 和 2,其中 0 表示已经资源已经用光了,此时线程再要获得共享锁就需要进入同步序列等待。下面是具体实现:

    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    /**
     * Created by Jikai Zhang on 2017/4/9.
     * <p>
     * 自定义共享锁
     */
    public class TwinsLock implements Lock {
        private static class Sync extends AbstractQueuedSynchronizer {
            public Sync(int resourceCount) {
                if (resourceCount <= 0) {
                    throw new IllegalArgumentException("resourceCount must be larger than zero.");
                }
                // 设置可以共享的资源总数
                setState(resourceCount);
            }
            @Override
            protected int tryAcquireShared(int reduceCount) {
                // 使用尝试获得资源,如果成功修改了状态变量(获得了资源)
                // 或者资源的总量小于 0(没有资源了),则返回。
                for (; ; ) {
                    int lastCount = getState();
                    int newCount = lastCount - reduceCount;
                    if (newCount < 0 || compareAndSetState(lastCount, newCount)) {
                        return newCount;
                    }
                }
            }
            @Override
            protected boolean tryReleaseShared(int returnCount) {
                // 释放共享资源,因为可能有多个线程同时执行,所以需要使用 CAS 操作来修改资源总数。
                for (; ; ) {
                    int lastCount = getState();
                    int newCount = lastCount + returnCount;
                    if (compareAndSetState(lastCount, newCount)) {
                        return true;
                    }
                }
            }
        }
        // 定义两个共享资源,说明同一时间内可以有两个线程同时运行
        private final Sync sync = new Sync(2);
        @Override
        public void lock() {
            sync.acquireShared(1);
        }
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
        @Override
        public boolean tryLock() {
            return sync.tryAcquireShared(1) >= 0;
        }
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(time));
        }
        @Override
        public void unlock() {
            sync.releaseShared(1);
        }
        @Override
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
        public static void main(String[] args) {
            final Lock lock = new TwinsLock();
            int threadCounts = 10;
            Thread threads[] = new Thread[threadCounts];
            for (int i = 0; i < threadCounts; i++) {
                final int index = i;
                threads[i] = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < 5; i++) {
                            lock.lock();
                            try {
                                TimeUnit.SECONDS.sleep(1);
                                System.out.println(Thread.currentThread().getName());
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } finally {
                                lock.unlock();
                            }
                            try {
                                TimeUnit.SECONDS.sleep(1);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
            }
            for (int i = 0; i < threadCounts; i++) {
                threads[i].start();
            }
        }
    }
    

    运行程序,我们会发现程序每次都会同时打印两条语句,如下面的形式,证明同时有两个线程在执行。

    Thread-0
    Thread-1
    Thread-3
    Thread-2
    Thread-8
    Thread-4
    Thread-3
    Thread-6
    

    相关文章

      网友评论

          本文标题:JAVA多线程(二)——详解 AbstractQueuedSyn

          本文链接:https://www.haomeiwen.com/subject/awwniftx.html