美文网首页
第5章 Java的锁

第5章 Java的锁

作者: 红袖者 | 来源:发表于2018-01-25 23:10 被阅读0次
    基本概念:

    锁:控制多线程并发访问资源;
    队列同步器:管理同步状态,实现锁;
    同步状态:同步器的操作对象,int类型;
    同步队列:同步器通过同步队列管理同步状态;

    同步器实现锁:

    1.自定义同步器;
    2.同步器定义如何获取、释放同步状态;
    3.锁通过同步器来实现语义;

    public class Mutex implements Lock {
    
        //自定义的同步器
        static class MySyncer extends AbstractQueuedSynchronizer {
    
            //独占式获取同步状态
            @Override
            protected boolean tryAcquire(int arg) {
                if(compareAndSetState(0, 1)){
                    //设置当前线程独占同步器
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            //独占式释放同步状态
            @Override
            protected boolean tryRelease(int arg) {
                if(getState() == 0)
                    throw new IllegalMonitorStateException();
                //设置同步器无占用线程
                setExclusiveOwnerThread(null);
                //设置同步状态为0
                setState(0);
                return true;
            }
    
            //同步器是否被独占
            @Override
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
    
            Condition newCondition(){
                return new ConditionObject();
            }
        }
    
        //同步器对象,用来实现锁
        private final MySyncer syncer = new MySyncer();
    
        @Override
        public void lock() {
            syncer.acquire(1);
        }
    
        @Override
        public boolean tryLock() {
            return syncer.tryAcquire(1);
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return syncer.tryAcquireNanos(1, unit.toNanos(time));
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            syncer.acquireInterruptibly(1);
        }
    
        @Override
        public void unlock() {
            syncer.release(1);
        }
    
        @Override
        public Condition newCondition() {
            return syncer.newCondition();
        }
        
    } 
    
    同步器get/update同步状态:
    getState(); //获取同步状态
    setState(); //设置同步状态
    compareAndSetState(int expect, int update); //CAS设置同步状态,保证原子性
    
    同步器acquire/release同步状态(可重写):
    //模板方法调用,非阻塞
    tryAcquire(int arg); //独占式占用同步状态
    tryRelease(int arg); //独占式释放同步状态
    tryAcquireShared(int arg); //共享式占用同步状态
    tryReleaseShared(int arg); //共享式释放同步状态
    

    注:单词get与acquire区别,get指直接获取,acquire指通过努力后获取/占用;

    同步器的模板方法(不可重写):

    锁通过同步器对象直接调用模板方法来实现语义;
    模板方法调用上面tryXxxx(int arg)方法来实现状态占用/释放;

    //独占式,阻塞式
    acquire(int arg); //独占式占用同步状态
    acquireInterruptibly(int arg); //独占式,可响应中断
    tryAcquireNanos(int arg, long nanosTimesout); //独占式,可响应中断,超时等待
    release(int arg); //独占式释放同步状态
    //共享式,阻塞式
    acquireShared(int arg); //共享式占用同步状态
    acquireSharedInterruptibly(int arg); //共享式,可响应中断
    tryAcquireSharedNanos(int arg, long nanosTimesout); //共享式, 可响应中断,超时等待
    releaseShared(int arg);//共享式释放同步状态
    

    注:阻塞式指同步器未获取同步状态时线程会阻塞,非阻塞指线程不会阻塞;

    同步队列:
    FIFO同步队列

    1.同步队列是由Node组成的链表;
    2.当线程获取锁失败时,就把thread及等待状态信息包装成node插入队列尾部,并更改tail节点,通过CAS操作保证并发性;
    3.当head节点的线程释放锁时,就唤醒其next节点,next节点获取同步状态成功时就把自己设为head节点;

    独占式(共享式)占用和释放同步状态:
    独占式/共享式占用同步状态流程图
    区别:同一时刻,独占式只有一个线程可以获取同步状态,而共享式可以有多个线程获取同步状态;
    相同点:第一次获取同步状态失败后都生成节点,插入同步队列尾部;都是通过自旋来获取同步状态;前驱为头节点时才能获取到同步状态;
    自定义同步主键TwinsLock-双线程锁:

    同一时刻有两个线程可以持有锁,即同步器共享式占用同步状态,且同步状态数为2;

    public class TwinsLock implements Lock {
    
        //同步器对象
        private Syncer syncer = new Syncer(2);
    
        static class Syncer extends AbstractQueuedSynchronizer {
    
            //初始化同步状态的数量
            public Syncer(int stateCount) {
                setState(stateCount);
            }
    
            //共享式获取同步状态,非阻塞
            @Override
            protected int tryAcquireShared(int arg) {
                for (; ; ) {
                    int currentState = getState();
                    int newState = currentState - 1;
                    if (newState < 0 || compareAndSetState(currentState, newState))
                        return newState;
                }
            }
    
            //共享式释放同步状态,非阻塞
            @Override
            protected boolean tryReleaseShared(int arg) {
                for (; ; ) {
                    int currentState = getState();
                    if (compareAndSetState(currentState, currentState + 1))
                        return true;
                }
            }
        }
    
        //加锁
        @Override
        public void lock() {
            //acquireShared()会调用tryAcquireShared()
            syncer.acquireShared(1);
        }
    
        //释放锁
        @Override
        public void unlock() {
            syncer.releaseShared(1);
        }
    }
    

    lock()调用模板方法acquireShared()共享式获取同步状态, red()调用用户自定义的tryAcquireShared()来获取同步状态,若获取成功,则线程拥有了锁;若tryAcquireShared()获取同步状态失败,则把线程及其状态包装成node插入同步队列的尾部,并进行自旋来获取同步状态;自旋过程中,当node的前置节点是head节点且释放同步状态后,当前节点就调用tryAcquireShared()来获取同步状态,若获取成功则表示线程拥有了锁,若获取失败则继续自旋;

    重入锁:

    重入锁指线程获取到锁后能够再次获取该锁,而不会被锁阻塞;
    公平模式:
    锁的获取顺序符合请求的绝对时间顺序;

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //同步状态
            int c = getState();
            if (c == 0) {
                //锁未被任何线程占用
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                    //没有更早的线程处于WAITTING状态,且当前线程获取同步状态成功
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                //锁被当前线程占用
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    

    非公平模式:
    有新线程请求锁时,先争夺一下锁,没成功再去排队;排队之后依然满足FIFO规则,即前面节点的线程先获取锁;

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //同步状态
            int c = getState();
            if (c == 0) {
                //锁未被任何线程占用
                if (compareAndSetState(0, acquires)) {
                    //当前线程获取同步状态成功
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                //锁被当前线程占用
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    

    总结:公平和非公平模式下调用lock()获取锁时,调用acquire()模板方法,acquire()方法首先都调用tryAcquire(1)第一次尝试获取同步状态,若锁此刻锁未被占用,则公平模式下会判断是否有更早的线程处于WAITTING状态,若有则第一次尝试获取锁失败,若没有则尝试获取同步状态;而非公平模式下则直接尝试获取同步状态,不考虑是否有更早的线程是否处于WAITTING状态;公平模式和非公平模式在第一次尝试获取同步状态失败后,都会把线程及其状态包装成node插入FIFO同步队列尾部,之后通过自旋来获取同步状态,公平和非公平模式下都需要等待前置节点来唤醒自己;
    性能对比:公平锁具有大量的线程切换,因此其吞吐性不如非公平锁;

    读写锁:

    排它锁:同一时刻只能有一个线程获取锁(如ReentrantLock);
    1.读锁是非排它锁,同一时刻可以有多个读线程获取锁;但是写线程获取锁时,所有读线程和其它写线程均被阻塞;
    2.读写锁是一对锁,包括读锁(可重入共享锁)和写锁(可重入排它锁);
    3.包括公平模式和非公平模式;
    4.支持重进入,且在获取写锁后还能获取读锁,但获取读锁后不能获取写锁;
    5.在读多于写的情况下,读写锁比排它锁具有更好的吞吐性;
    读写状态的设计:
    整型变量的高16位表示读、低16位表示写,则线程获取读锁后,同步状态S=S+(1<<16),线程获取写锁后,同步状态S=S+1;
    写锁的获取与释放:
    写锁获取成功的条件:1.写锁此刻被当前线程拥有;2.读锁or写锁此刻没被任何线程拥有;否则获取写锁失败,线程进入WAITTING状态;

    protected final boolean tryAcquire(int acquires) {
                /*
                 * Walkthrough:
                 * 1. If read count nonzero or write count nonzero
                 *    and owner is a different thread, fail.
                 * 2. If count would saturate, fail. (This can only
                 *    happen if count is already nonzero.)
                 * 3. Otherwise, this thread is eligible for lock if
                 *    it is either a reentrant acquire or
                 *    queue policy allows it. If so, update state
                 *    and set owner.
                 */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                //存在读锁or写锁
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    //存在读锁or不是当前线程获取写锁,则获取写锁失败
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                //之前是当前线程获取的写锁,因此不存在并发问题,不需要CAS操作
                setState(c + acquires);
                return true;
            }
            //之前没任何线程获取读锁or写锁,此刻可能有多个写线程并发请求写锁,需要CAS操作设置同步状态
            if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                return false;
            //当前线程首次获取同步状态成功
            setExclusiveOwnerThread(current);
            return true;
        }
    

    读锁的获取与释放:
    读锁获取成功的条件:1.写锁此刻被当前线程拥有;2.写锁此刻没被任何线程拥有;否则获取写锁失败,线程进入WAITTING状态;
    只要"其它线程拥有写锁"的情况不出现,则当前线程就不断尝试获取同步状态,而不是进入等待状态,是因为当前线程此刻仍然具有获取读锁的资格,而不用等待资格;但是当其它线程拥有写锁时,则当前线程获取同步状态失败,失去获取读锁的资格,退出for循环,进入WAITTING状态,并把线程及其状态包装成node插入FIFO同步队列尾部,之后通过自旋来获取同步状态

        protected final int tryAcquireShared(int unused) {
            //只要"其它线程拥有写锁"的情况不出现,则当前线程就不断尝试获取同步状态,
            for (; ; ) {
                Thread current = Thread.currentThread();
                int c = getState();
                int nextC = c + (1 << 16);
                if (nextC < c)
                    throw new Error("Maximum lock count exceeded");
                if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
                    //存在写锁(写状态不为0),且写锁拥有线程不是当前线程,则获取读锁失败
                    return -1;
                if (compareAndSetState(c, nextC))
                    //获取读锁成功
                    return 1;
            }
        }
    

    锁降级:
    锁降级的过程:拥有写锁->预处理(写/改)数据->拥有读锁->释放写锁->使用(读)数据->释放读锁,即读锁降级到写锁;
    占用读锁,释放写锁,读数据,这样做有两个好处:1.使用(读)数据期间,其它数据只读的线程可以获取到读锁,而不至于堵塞,提高了效率;2.保证使用(读)数据的过程中数据是没有发生变化的,因为在释放读锁前,其它线程无法获取写锁来更改数据;

    public class LockDown {
    
        static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        static Lock readLock = lock.readLock();
        static Lock writeLock = lock.writeLock();
    
        static class Thread1 implements Runnable{
            //线程是否完成了数据准备的更新
            static volatile boolean update = false;
    
            @Override
            public void run() {
    
            }
    
            public void processData(){
                readLock.lock();
                if(update == false){
                    readLock.unlock();
                    writeLock.lock();
                    try {
                        //预处理(写/改)数据
                        prepareData();
                        update = true;
                        //先获取读锁
                        readLock.lock();
                    }finally {
                        //然后释放写锁
                        writeLock.unlock();
                    }
                }
                try {
                    //使用(读)数据
                    useData();
                }finally {
                    //最后释放读锁
                    readLock.unlock();
                }
            }
            
            public void prepareData(){}
    
            public void useData(){}
        }
    
    }
    

    注:Java不支持锁升级,锁升级会引起死锁;

    LockSupport工具:

    LockSupport工具类定义了一组public static方法,用来阻塞当前线程、唤醒被阻塞的线程;

    void park(); //阻塞当前线程
    void unpark(Thread thread); //唤醒阻塞的线程thread
    void park(Object blocker); //blocker是当前线程等待的对象(阻塞对象,锁对象??)
    
    Condition接口:

    提供锁的监视器方法:await()、signal()、signalAll(),用于线程间通信;

    //Condition对象通过Lock对象创建
    Condition condition = lock.newCondition();
    

    等待队列:
    当线程调用condition.await()方法时,就把线程及其状态包装成node插入等待队列的尾部,此刻线程进入WAITTING状态;
    等待队列与同步队列不同之处:
    1.同步队列是线程获取获取锁(同步状态失败)时插入的队列,等待队列是线程调用wait()方法时插入的队列;2.同步队列是双链表,而等待队列是单链表;3.一个Condition对象对应一个等待队列,一个Lock对象可以有多个Condition对象,即可以拥有多个等待队列,但只能拥有一个同步队列;
    等待队列与与同步队列相同之处:
    1.共用同步器的静态内部类AbstractQueuedSynchronized.Node来生成节点;2.链表结构相似;
    等待队列的结构:
    单向FIFO链表,head节点指向链表的头,tail节点指向链表的尾;

    等待队列的结构
    线程进入等待状态:
    线程进入等待状态
    1.同步队列head节点的线程构造新节点并加入等待队列;2.释放同步状态;3.唤醒后继节点;4.进入等待状态;
    线程被通知:
    线程被通知
    1.等待队列的head节点移动到同步队列的尾节点上,线程从WAITTING状态进入BLOCKED状态;2.唤醒线程去竞争同步状态;3.非同步队列的首节点,获取同步状态失败,再次阻塞;4.最终通过自旋获取同步状态成功后从await()返回,此刻线程成功获取了锁;

    相关文章

      网友评论

          本文标题:第5章 Java的锁

          本文链接:https://www.haomeiwen.com/subject/wnevaxtx.html