美文网首页
ReentrantLock

ReentrantLock

作者: RealityVibe | 来源:发表于2018-10-16 00:30 被阅读0次

    部分源码分析

    内部类部分

    Sync类

     abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
    
            /**
             * Performs {@link Lock#lock}. The main reason for subclassing
             * is to allow fast path for nonfair version.
             */
            abstract void lock();
    
            /**
             * Performs non-fair tryLock.  tryAcquire is
             * implemented in subclasses, but both need nonfair
             * try for trylock method.
             */
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
            protected final boolean tryRelease(int releases) {
                // 更新state值
                int c = getState() - releases;
                // 如果该线程不是目前独占锁的拥有者,抛出异常
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                // 如果全部锁释放,返回true
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
            // 判断是否持有(独占)锁
            protected final boolean isHeldExclusively() {
                // While we must in general read state before owner,
                // we don't need to do so to check if current thread is owner
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
    
            // Methods relayed from outer class
     
            final Thread getOwner() {
                return getState() == 0 ? null : getExclusiveOwnerThread();
            }
    
            // 获取加锁次数
            final int getHoldCount() {
                return isHeldExclusively() ? getState() : 0;
            }
            
            // 判断是否加锁
            final boolean isLocked() {
                return getState() != 0;
            }
    
            /**
             * Reconstitutes this lock instance from a stream.
             * @param s the stream
             */
            private void readObject(java.io.ObjectInputStream s)
                throws java.io.IOException, ClassNotFoundException {
                s.defaultReadObject();
                setState(0); // reset to unlocked state
            }
        }
    
    

    NonfairSync

     static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             * 加锁操作,先尝试插个队请求锁。插队失败,则正常排队获取。
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
    

    FairSync

    static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            // 排队获取锁
            final void lock() {
                acquire(1);
            }
    
            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             * 正常是递归调用,或没有等待者,或者来到了队列的head,否则不授予权限
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    // 重复加锁(对应的,要释放对应数量的锁),对应可重入锁
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    ReentrantLock类本类

    构造函数
        /**
         * 无参构造函数
         * 默认sync为不公平
         * 同 ReentrantLock(false);
         */
        public ReentrantLock() {
            sync = new NonfairSync();
        }
    
        /**
         * 参数fair决定fair/nonfair
         */
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    
    
    lock()

    获取锁的方式看sync为fair还是nonfair

    NonfairSync中调用lock()时,线程会插队先去竞争锁,竞争失败后才会进入阻塞队列。而FairSync中,线程调用lock()时,直接进入阻塞队列等待。

    /**
     * 获取锁,获取锁的方式看sync为fair还是nonfair
     * <p> 如果锁未被占有或当前线程已经占有锁,则count+1
     * <p> 如果锁已经被占有,且占有者不是当前线程则当前线程进入休眠状态,直到占有锁。
     */
    public void lock() {
        sync.lock();
    }
    
    • FairSync

      final void lock() {
          acquire(1); //acquire()函数的内容见AbstractQueuedSynchronizer源码。
      }
      
    • NonfairSync

    • final void lock() {
          if (compareAndSetState(0, 1))
              setExclusiveOwnerThread(Thread.currentThread());
          else
              acquire(1);//acquire()函数的内容见AbstractQueuedSynchronizer源码。同上
      }
      
    lockInterInterruptibly()
    /**
     * 以独占模式获取,打断后终止。
     * 首先检查中断状态,然后至少调用一次{@link #tryAcquire},成功则返回success。 否则进入queue,通过
     * CAS自旋获取锁(见AbstractQueuedSynchronizer#acquireQueued),直到成功或者被打断
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    

    场景应用

    如何而配合Condition

    // TODO

    中断响应

    
    import java.util.concurrent.locks.ReentrantLock;
    
    public class InterruptThread implements Runnable {
        public static ReentrantLock lock1 = new ReentrantLock();
        public static ReentrantLock lock2 = new ReentrantLock();
        int lock;
    
        public InterruptThread(int lock) {
            this.lock = lock;
        }
    
        @Override
        public void run() {
            try {
                if (lock == 1) {
                    lock1.lockInterruptibly();
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                    }
                    lock2.lockInterruptibly();
                } else if (lock == 2) {
                    lock2.lockInterruptibly();
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                    }
                    lock1.lockInterruptibly();
                }
                    if (lock1.isHeldByCurrentThread())
                        lock1.unlock();
                    if (lock2.isHeldByCurrentThread())
                        lock2.unlock();
                    System.out.println(Thread.currentThread().getId() + "退出");
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            InterruptThread r1 = new InterruptThread(1);
            InterruptThread r2 = new InterruptThread(2);
            Thread t1 = new Thread(r1);
            Thread t2 = new Thread(r2);
            t1.start();
            t2.start();
            t1.interrupt();
        }
    
    }
    
    

    实现阻塞队列

    • BoundedQueue.java
    package com.yrls.config;
    
    import java.util.Arrays;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class BoundedQueue {
        private Integer[] items;//定义为数组,在创建对象时就确定容量
        private Lock lock = new ReentrantLock();
        private Condition notEmpty = lock.newCondition();
        private Condition notFull = lock.newCondition();
    
        private int count;
        private int addIndex,removeIndex;
    
        public BoundedQueue(int size){
            items = new Integer[size];
        }
    
        public void add(Integer object) throws InterruptedException{
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "lock ");
            try{
                if(Thread.currentThread().getName().equals("1") || Thread.currentThread().getName().equals("10") ){
                    for(int i = 0;i<10;++i){
                        System.out.println(i);
                    }
                }
    
                while(count==items.length){
                    notFull.await();
                }
                items[addIndex] = object;
                if(++addIndex==items.length){
                    addIndex = 0;
                }
                count++;
                System.out.println(Thread.currentThread()+" 插入一个元素,数组为:"+Arrays.toString(items));
                notEmpty.signal();
            }finally{
                System.out.println(Thread.currentThread().getName() + " unlock");
                lock.unlock();
            }
        }
    
        @SuppressWarnings("unchecked")
        public Integer remove() throws InterruptedException{
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "  remove-lock");
            try{
                if(Thread.currentThread().getName().equals("1")){
                    for(int i = 0;i<10;++i){
                        System.out.println(i);
                    }
                }
                while(count==0){
                    System.out.println("----start wait----");
                    notEmpty.await();
                }
                Integer temp = items[removeIndex];
                items[removeIndex] = null;
                System.out.println(Thread.currentThread()+" 移除一个元素,数组为:"+Arrays.toString(items));
                if(++removeIndex==items.length){
                    removeIndex=0;
                }
                count--;
                notFull.signal();
                return temp;
            }finally{
                System.out.println(Thread.currentThread().getName() + "  remove-unlock");
                lock.unlock();
            }
        }
    }
    
    • Test.java
    public class New1 {
        private static final Random random = new Random(System.currentTimeMillis());
    
        public static void main(String[] args) throws InterruptedException {
    
            BoundedQueue queue = new BoundedQueue(5);
    
            for(int i=1;i<=6;i++){
                Thread thread = new Thread(new Producter(queue),String.valueOf(i));
                thread.start();
            }
    
            for(int i=1;i<=12;i++){
                Thread thread = new Thread(new Consumer(queue),String.valueOf(i));
                thread.start();
            }
        }
    
        static class Producter implements Runnable{
            private BoundedQueue queue;
            public Producter(BoundedQueue queue){
                this.queue = queue;
            }
            public void produce() throws InterruptedException{
                queue.add(new Integer(random.nextInt(100)));
            }
            @Override
            public void run() {
                try {
                    produce();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class Consumer implements Runnable{
            private BoundedQueue queue;
            public Consumer(BoundedQueue queue){
                this.queue = queue;
            }
            public Integer remove() throws InterruptedException{
                return queue.remove();
            }
            @Override
            public void run() {
                try {
                    remove();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    • 结果

    为了便于了解对ReentrantLock加锁过程中其它线程的状态,打印内容增加了一些无聊的输出,便于理解。

    1. 可以看到第一个线程占有reentrantlock时,其它线程无法对去竞争reentrantlock。(在thread1中打印数字的这段时间,未被打断)
    2. 可重入锁是针对同线程而言,不同线程不适用。
    3. 可以看到输出的最后部分有线程6\7\9\11\12(不一定是这几个)的加锁但没有解锁,线程6获取锁后,由于资源耗尽,触发condition.await(),6将占有锁并等待signal()。在6释放锁之前,接下来的线程的lock()都将竞争失败,进入队列。
    1lock 
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    Thread[1,5,main] 插入一个元素,数组为:[49, null, null, null, null]
    1 unlock
    2lock 
    Thread[2,5,main] 插入一个元素,数组为:[49, 22, null, null, null]
    2 unlock
    3lock 
    Thread[3,5,main] 插入一个元素,数组为:[49, 22, 60, null, null]
    3 unlock
    4lock 
    Thread[4,5,main] 插入一个元素,数组为:[49, 22, 60, 70, null]
    4 unlock
    3  remove-lock
    Thread[3,5,main] 移除一个元素,数组为:[null, 22, 60, 70, null]
    3  remove-unlock
    5lock 
    Thread[5,5,main] 插入一个元素,数组为:[null, 22, 60, 70, 66]
    5 unlock
    8  remove-lock
    Thread[8,5,main] 移除一个元素,数组为:[null, null, 60, 70, 66]
    8  remove-unlock
    6lock 
    Thread[6,5,main] 插入一个元素,数组为:[60, null, 60, 70, 66]
    6 unlock
    1  remove-lock
    Thread[1,5,main] 移除一个元素,数组为:[60, null, null, 70, 66]
    1  remove-unlock
    2  remove-lock
    Thread[2,5,main] 移除一个元素,数组为:[60, null, null, null, 66]
    2  remove-unlock
    4  remove-lock
    Thread[4,5,main] 移除一个元素,数组为:[60, null, null, null, null]
    4  remove-unlock
    5  remove-lock
    Thread[5,5,main] 移除一个元素,数组为:[null, null, null, null, null]
    5  remove-unlock
    6  remove-lock
    ----start wait----
    7  remove-lock
    9  remove-lock
    10  remove-lock
    11  remove-lock
    12  remove-lock
    

    相关文章

      网友评论

          本文标题:ReentrantLock

          本文链接:https://www.haomeiwen.com/subject/ibcczftx.html