美文网首页Java 杂谈
深入Java多线程之JUC锁(一)--互斥锁ReentrantL

深入Java多线程之JUC锁(一)--互斥锁ReentrantL

作者: JackFrost_fuzhu | 来源:发表于2019-01-17 09:40 被阅读4次

    文章结构:

    1)ReentrantLock介绍
    2)使用demo
    3)形象说明其原理
    4)源码阅读分析
    5)思考几个问题

    一、ReentrantLock介绍:

    (1)简介:

    ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。

    ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。

    ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待)。

    (2)对比synchronized归纳为三个方面:

    1)可重入互斥锁:

    可重入:意思就是可被单个线程多次获取。
    synchronized也是可重入的

    2)公平与非公平:

    synchronized是非公平的。
    ReentrantLock有两种模式,默认是非公平。

    在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。

    3)等待可中断

    也就是持有锁的线程长期不释放时,正等待的线程可选择放弃。不像synchronized会导致死锁,然后交给JVM处理死锁。

    二、使用demo:

    我们用最经典的生产者消费者代码来演示ReentrantLock。

    (1)纯ReentrantLock使用:

    定义仓库

    public class Depot {
    
        private int size;        // 仓库的实际数量
        private Lock lock;        // 独占锁
    
        public Depot() {
            this.size = 0;
            this.lock = new ReentrantLock();
        }
    
        public void produce(int val) {
            lock.lock();
            try {
                size += val;
                System.out.println("生产者"+Thread.currentThread().getName()+" 生产"+val+"数量"+ " 仓库数量变为:"+ size);
            } finally {
                lock.unlock();
            }
        }
    
        public void consume(int val) {
            lock.lock();
            try {
                size -= val;
                System.out.println("消费者"+Thread.currentThread().getName()+" 消费"+val+"数量"+ " 仓库数量变为:"+ size);
            } finally {
                lock.unlock();
            }
        }
    };
    

    定义生产者消费者

    public class Customer {
        private Depot depot;
    
        public Customer(Depot depot) {
            this.depot = depot;
        }
    
        // 消费产品:线程从仓库中消费产品。一个线程代表一个消费者
        public void consume(final int val) {
            new Thread() {
                public void run() {
                    depot.consume(val);
                }
            }.start();
        }
    }
    
    public class Producer {
        private Depot depot;
    
        public Producer(Depot depot) {
            this.depot = depot;
        }
    
        // 消费产品:线程向仓库中生产产品。一个线程代表一个生产者
        public void produce(final int val) {
            new Thread() {
                public void run() {
                    depot.produce(val);
                }
            }.start();
        }
    }
    

    测试

    public class Test {
        public static void main(String[] args) {
            Depot depot = new Depot();
            Producer producer = new Producer(depot);
            Customer customer = new Customer(depot);
    
            producer.produce(70);
            producer.produce(120);
            customer.consume(60);
            customer.consume(150);
            producer.produce(130);
        }
    }
    

    运行结果:

    生产者Thread-0 生产70数量 仓库数量变为:70
    消费者Thread-3 消费150数量 仓库数量变为:-80
    生产者Thread-4 生产130数量 仓库数量变为:50
    消费者Thread-2 消费60数量 仓库数量变为:-10
    生产者Thread-1 生产120数量 仓库数量变为:110
    

    分析:
    1)Depot 是个仓库。可以通过生产者去生产货物,通过消费者去消费货物。
    2)通过ReentrantLock独占锁,可以锁住仓库,每个线程的修改都是原子性质,并且是有序的。
    3)但是我们可以发现一个问题。现实中,仓库的容量不可能为负数,仓库的容量是有限制的。所以基于这样的生产消费模型,有实际业务需求,我们又需要利用ReentrantLock去设计。

    (2)结合Condition条件队列解决我们的实际需求:

    public class Depot {
        private int       capacity;    // 仓库的容量
        private int       size;        // 仓库的实际数量
        private Lock      lock;        // 独占锁
        private Condition fullCondtion;            // 生产条件
        private Condition emptyCondtion;        // 消费条件
    
        public Depot(int capacity) {
            this.capacity = capacity;
            this.size = 0;
            this.lock = new ReentrantLock();
            this.fullCondtion = lock.newCondition();
            this.emptyCondtion = lock.newCondition();
        }
    
        public void produce(int val) {
            lock.lock();
            try {
                // left 表示“想要生产的数量”(有可能生产量太多,需多此生产)
                int left = val;
                while (left > 0) {
                    // 库存已满时,等待“消费者”消费产品。
                    while (size >= capacity) {
                        fullCondtion.await();
                    }
                    // 获取“实际生产的数量”(即库存中新增的数量)
                    // 如果“库存”+“想要生产的数量”>“总的容量”,则“实际增量”=“总的容量”-“当前容量”。(此时填满仓库)
                    // 否则“实际增量”=“想要生产的数量”
                    int inc = (size + left) > capacity ? (capacity - size) : left;
                    size += inc;
                    left -= inc;
                    System.out.println(
                            "生产者" + Thread.currentThread().getName() + " 本线程想生产" + val + " 本次生产后还要生产" + left + " 仓库增加" + inc + " 仓库目前实际数量"
                                    + size);
                    // 通知“消费者”可以消费了。
                    emptyCondtion.signal();
                }
            } catch (InterruptedException e) {
    
            } finally {
                lock.unlock();
            }
        }
    
        public void consume(int val) {
            lock.lock();
            try {
                // left 表示“客户要消费数量”(有可能消费量太大,库存不够,需多此消费)
                int left = val;
                while (left > 0) {
                    // 库存为0时,等待“生产者”生产产品。
                    while (size <= 0) {
                        emptyCondtion.await();
                    }
                    // 获取“实际消费的数量”(即库存中实际减少的数量)
                    // 如果“库存”<“客户要消费的数量”,则“实际消费量”=“库存”;
                    // 否则,“实际消费量”=“客户要消费的数量”。
                    int dec = (size < left) ? size : left;
                    size -= dec;
                    left -= dec;
                    System.out.printf(
                            "消费者" + Thread.currentThread().getName() + " 本线程想消费" + val + " 本次消费后还要消费" + left + " 仓库减少" + dec + " 仓库目前实际数量"
                                    + size);
                    fullCondtion.signal();
                }
            } catch (InterruptedException e) {
            } finally {
                lock.unlock();
            }
        }
    
        public String toString() {
            return "capacity:" + capacity + ", actual size:" + size;
        }
    }
    
    

    生产者消费者:

    public class Customer {
        private Depot depot;
    
        public Customer(Depot depot) {
            this.depot = depot;
        }
    
        // 消费产品:线程从仓库中消费产品。一个线程代表一个消费者
        public void consume(final int val) {
            new Thread() {
                public void run() {
                    depot.consume(val);
                }
            }.start();
        }
    }
    
    public class Producer {
        private Depot depot;
    
        public Producer(Depot depot) {
            this.depot = depot;
        }
    
        // 消费产品:线程向仓库中生产产品。一个线程代表一个生产者
        public void produce(final int val) {
            new Thread() {
                public void run() {
                    depot.produce(val);
                }
            }.start();
        }
    }
    

    测试:

    public class Test {
        public static void main(String[] args) {
            Depot depot = new Depot(100);
            Producer producer = new Producer(depot);
            Customer customer = new Customer(depot);
    
            producer.produce(70);
            producer.produce(120);
            customer.consume(60);
            customer.consume(150);
            producer.produce(130);
        }
    }
    

    结果:

    生产者Thread-0 本线程想生产70 本次生产后还要生产0 仓库增加70 仓库目前实际数量70
    消费者Thread-2 本线程想消费60 本次消费后还要消费0 仓库减少60 仓库目前实际数量10
    生产者Thread-4 本线程想生产130 本次生产后还要生产40 仓库增加90 仓库目前实际数量100
    消费者Thread-3 本线程想消费150 本次消费后还要消费50 仓库减少100 仓库目前实际数量0
    生产者Thread-4 本线程想生产130 本次生产后还要生产0 仓库增加40 仓库目前实际数量40
    消费者Thread-3 本线程想消费150 本次消费后还要消费10 仓库减少40 仓库目前实际数量0
    生产者Thread-1 本线程想生产120 本次生产后还要生产20 仓库增加100 仓库目前实际数量100
    消费者Thread-3 本线程想消费150 本次消费后还要消费0 仓库减少10 仓库目前实际数量90
    生产者Thread-1 本线程想生产120 本次生产后还要生产10 仓库增加10 仓库目前实际数量100
    

    从结果来看,解决了仓库的容量不可能为负数,仓库的容量是有限制的这两个实际业务需求。

    (3)对比一下使用synchronized如何解决我们的这两个实际业务需求。

    三、形象说明其原理:

    背景:一群人排队领钱。一个家庭的人同属一个线程。

    (1)公平锁:

    1)非重入情况

    在这里插入图片描述

    2)重入情况:

    一家人,就是同一个线程,那就优先。


    在这里插入图片描述

    (2)非公平锁:

    直接尝试插队。


    在这里插入图片描述

    尝试失败,那就去队尾


    在这里插入图片描述

    (3)如果加入条件队列呢?

    1)公平锁:

    在这里插入图片描述

    2)非公平锁:

    在这里插入图片描述

    (4)如果多个条件队列呢?

    在这里插入图片描述

    1)公平锁下:

    在这里插入图片描述

    然后变成这样的队列


    在这里插入图片描述

    2)非公平锁下:

    在这里插入图片描述

    去争夺锁的占有权


    在这里插入图片描述

    如果成功占有


    在这里插入图片描述

    四、源码阅读分析:

    //实现Lock接口
    public class ReentrantLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = 7373984872572414699L;
        /** Synchronizer providing all implementation mechanics */
        // 同步队列
        private final Sync sync;
    
        //抽象同步器Sync继承AQS,子类可以非公平或者公平锁
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
            // 获取锁
            abstract void lock();
            // 非公平锁的尝试获取
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                // 获取状态
                int c = getState();
                if (c == 0) {
                // 跟公平锁获取的区别时,这里少了判断队列是否为空的函数hasQueuedPredecessors
                // 即不管排队队列是否为空,该线程都将直接尝试获取锁
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //如果c != 0,表示该锁已经被线程占有,则判断该锁是否是当前线程占有,若是设置state,否则直接返回false
                else if (current == getExclusiveOwnerThread()) {// 当前线程拥有该锁
                    int nextc = c + acquires;// 增加重入次数
                    if (nextc < 0) // 超过了int的表示范围
                        throw new Error("Maximum lock count exceeded");
                    // 设置状态
                    setState(nextc);
                    return true;
                }
                return false;
            }
             //tryRelease方法获取状态并减去releases的值,如果为0表示锁完全被释放
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                //只有持有锁的线程才能进行释放动作
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    //锁被释放,独占线程设置为null
                    setExclusiveOwnerThread(null);
                }
                //更改状态
                setState(c);
                return free;
            }
            // 判断资源是否被当前线程占有
            protected final boolean isHeldExclusively() {
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
            // 新生一个条件
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
    
            // 返回资源的占用线程
            final Thread getOwner() {
                return getState() == 0 ? null : getExclusiveOwnerThread();
            }
            // 返回状态
            final int getHoldCount() {
                return isHeldExclusively() ? getState() : 0;
            }
            // 资源是否被占用
            final boolean isLocked() {
                return getState() != 0;
            }
    
            /**
             * Reconstitutes the instance from a stream (that is, deserializes it).
             */
            private void readObject(java.io.ObjectInputStream s)
                throws java.io.IOException, ClassNotFoundException {
                s.defaultReadObject();
                setState(0); // reset to unlocked state
            }
        }
    
        // 非公平锁
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
           // 获得锁
            final void lock() {
            // 比较并设置状态成功,状态0表示锁没有被占用
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    // 锁已经被占用,或者set失败;以独占模式获取对象
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
        // 公平锁
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
            // 以独占模式获取对象,忽略中断
                acquire(1);
            }
    
            // 尝试公平获取锁
            protected final boolean tryAcquire(int acquires) {
            // 获取当前线程
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {// 状态为0,锁没被占用
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        // 设置当前线程独占
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //如果c != 0,表示该锁已经被线程占有,则判断该锁是否是当前线程占有,若是设置state,否则直接返回false
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)// 超过了int的表示范围
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    
       
        public ReentrantLock() {
        // 默认非公平策略
            sync = new NonfairSync();
        }
    
        
        public ReentrantLock(boolean fair) {
            //可以传递参数确定采用公平策略或者是非公平策略,参数为true表示公平策略
            sync = fair ? new FairSync() : new NonfairSync();
        }
    
        //委托给Sync同步器
        public void lock() {
            sync.lock();
        }
    
       //委托给Sync同步器
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        //委托给Sync同步器
        public boolean tryLock() {
            return sync.nonfairTryAcquire(1);
        }
    
        //委托给Sync同步器
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
    
       //委托给Sync同步器
        public void unlock() {
            sync.release(1);
        }
        //委托给Sync同步器
        public Condition newCondition() {
            return sync.newCondition();
        }
    //委托给Sync同步器
        public int getHoldCount() {
            return sync.getHoldCount();
        }
    
        //委托给Sync同步器
        public boolean isHeldByCurrentThread() {
            return sync.isHeldExclusively();
        }
    
        //委托给Sync同步器
        public boolean isLocked() {
            return sync.isLocked();
        }
    
        //委托给Sync同步器
        public final boolean isFair() {
            return sync instanceof FairSync;
        }
    
        //委托给Sync同步器
        protected Thread getOwner() {
            return sync.getOwner();
        }
    
       //委托给Sync同步器
        public final boolean hasQueuedThreads() {
            return sync.hasQueuedThreads();
        }
    
      //委托给Sync同步器
        public final boolean hasQueuedThread(Thread thread) {
            return sync.isQueued(thread);
        }
    
         //委托给Sync同步器
        public final int getQueueLength() {
            return sync.getQueueLength();
        }
    
      //委托给Sync同步器
        protected Collection<Thread> getQueuedThreads() {
            return sync.getQueuedThreads();
        }
    
        //委托给Sync同步器,管理等待队列
        public boolean hasWaiters(Condition condition) {
            if (condition == null)
                throw new NullPointerException();
            if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
                throw new IllegalArgumentException("not owner");
            return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
        }
    
       //委托给Sync同步器,管理等待队列
        public int getWaitQueueLength(Condition condition) {
            if (condition == null)
                throw new NullPointerException();
            if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
                throw new IllegalArgumentException("not owner");
            return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
        }
    
        //委托给Sync同步器,管理等待队列
        protected Collection<Thread> getWaitingThreads(Condition condition) {
            if (condition == null)
                throw new NullPointerException();
            if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
                throw new IllegalArgumentException("not owner");
            return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
        }
    
        
        public String toString() {
            Thread o = sync.getOwner();
            return super.toString() + ((o == null) ?
                                       "[Unlocked]" :
                                       "[Locked by thread " + o.getName() + "]");
        }
    }
    
    

    五、思考几个问题

    (1)关键的CAS方法的设计

    protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    

    对应unsafe类compareAndSwapInt方法的四个参数分别是:
    1)var1:需要改变的对象
    2)var2:偏移量(即之前求出value offset的值--在本对象中变量位置,对象中要修改变量的偏移量)
    3)var3:期待的值
    4)var4:更新后的值
    如果this.value的值与expect这个值相等,那么则将value修改为update这个值,并返回一个true。
    如果调用该方法时,this.value的值与expect这个值不相等,那么不做任何操作,并返回一个false。

    (2)为什么要自己产生中断?

    就是线程等候可以自己中断也可以别人中断。
    即使在等待队列中,线程在阻塞中被中断唤醒而获得CPU的执行权,但是如果此线程前面还有线程,根据公平性,依旧不能拿到锁,再次阻塞。所以要维持中断状态,重新产生中断。

    (3)理解下流程图中所说的waitStatus

    1)设计原因:

    如果获取锁失败,判断是否应该挂起当前线程,可见,挂起线程是有条件的。
    这是AQS静态内部类Node的成员变量,用于记录Node对应的线程等待状态。

    2)waitSatus值含义:

    1- SIGNAL

    从前面的代码状态转换可以看得出是前面有线程在运行,需要前面线程结束后,调用unpark()方法才能激活自己,值为:-1

    2- CANCELLED

    当AQS发起取消或fullyRelease()时,会是这个状态。值为1,也是几个状态中唯一一个大于0的状态,所以前面判定状态大于0就基本等价于是CANCELLED的意思。

    通俗来说,因超时或中断,节点会被设为取消状态,被取消状态则不该去竞争锁,只能保持取消状态不变,不能转换状态。这个状态的节点会踢出队列,被GC收回。

    3- CONDITION

    线程基于Condition对象发生了等待,进入了相应的队列,自然也需要Condition对象来激活,值为-2。

    4- PROPAGATE

    读写锁中,当读锁最开始没有获取到操作权限,得到后会发起一个doReleaseShared()动作,内部也是一个循环,当判定后续的节点状态为0时,尝试通过CAS自旋方式将状态修改为这个状态,表示节点可以运行。

    5- 状态0

    初始化状态,也代表正在尝试去获取临界资源的线程所对应的Node的状态。


    问题待续......

    更多文章点此处

    相关文章

      网友评论

        本文标题:深入Java多线程之JUC锁(一)--互斥锁ReentrantL

        本文链接:https://www.haomeiwen.com/subject/wlpbdqtx.html