美文网首页
ReentrantLock源码分析(三)

ReentrantLock源码分析(三)

作者: 语童粑粑 | 来源:发表于2018-12-14 11:31 被阅读0次

synchronized有个重要的功能,可以通过object中的wait()和 notify()方法实现生产者/消费者。ReentrantLock基于Condition也同样可以实现,而且相对于synchronized的无差别通知,ReentrantLock可以选择性的通知,减少了很多无用的线程竞争。本文主要就是分析下ReentrantLock是怎么通过Condition实现生产者/消费者模式的。

public class ProducerCustomerWithLock {
    Executor pool = Executors.newFixedThreadPool(5);
    private List<String> storeList = new LinkedList<>();//仓库
    //仓库容量
    private int MAX_VALUE = 5;
    //仓库为空
    private int MIN_VALUE = 0;
    // 线程锁
    private Lock lock = new ReentrantLock();
    //仓库满了,绑定生产者线程
    private Condition full = lock.newCondition();

    //仓库为空,绑定消费者线程
    private Condition empty = lock.newCondition();
    //生产者
    private class producer implements Runnable {

        @Override
        public void run() {
            while (true) {
                produce();
            }
        }

        private void produce() {
            System.out.println(Thread.currentThread().getName() + "进入仓库,准备生产!");

            try {
                lock.lock();
                if (storeList.size() == MAX_VALUE) {
                    System.out.println("仓库已满,等待消费");
                    Thread.sleep(1000);
                    full.await(); //当前线程等待,让其他线程继续执行,可以看出wait是释放锁的
                }
                if (storeList.size() < MAX_VALUE) {
                    String product = "产品" + new Random().nextInt();
                    storeList.add(product);
                    System.out.println(Thread.currentThread().getName() + "往仓库中生产了一个产品!" + product);
                }
                Thread.sleep(1000);
                empty.signalAll();//唤醒消费者线程
            } catch (InterruptedException e) {
                System.out.println("中断异常");
//                    e.printStackTrace();
            }
            finally {
                lock.unlock();
            }
        }
    }

    private class consumer implements Runnable {

        @Override
        public void run() {
            while (true) {
                consume();
            }
        }

        private void consume() {
            System.out.println(Thread.currentThread().getName() + "进入仓库,准备消费!");

                try {
                    lock.lock();
                    if (storeList.size() == MIN_VALUE) {
                        System.out.println("仓库已空,等待生产");
                        Thread.sleep(1000);
                        empty.await(); //当前线程等待,让其他线程继续执行,可以看出wait是释放锁的
                    }
                    if (storeList.size() > MIN_VALUE) {
                        System.out.println(Thread.currentThread().getName() + "从仓库取得产品:" + storeList.remove(0));
                    }
                    Thread.sleep(1000);
                    full.signalAll();//唤醒生产者线程
                } catch (InterruptedException e) {
                    System.out.println("中断异常");
//                    e.printStackTrace();
                }
                finally {
                    lock.unlock();
                }
            }

    }

    //启动生产者和消费者线程
    public void start() {
        for (int i = 1; i < 5; i++) {
            pool.execute(new producer());
            pool.execute(new consumer());
        }

    }

    public static void main(String[] args) {
        ProducerCustomerWithLock pc = new ProducerCustomerWithLock();
        pc.start();
    }
}

打印结果如下,生产者消费者交替运行:

pool-1-thread-1进入仓库,准备生产!
pool-1-thread-2进入仓库,准备消费!
pool-1-thread-3进入仓库,准备生产!
pool-1-thread-1往仓库中生产了一个产品!产品-885144207
pool-1-thread-4进入仓库,准备消费!
pool-1-thread-5进入仓库,准备生产!
pool-1-thread-1进入仓库,准备生产!
pool-1-thread-2从仓库取得产品:产品-885144207
pool-1-thread-2进入仓库,准备消费!
pool-1-thread-3往仓库中生产了一个产品!产品-1433422526
pool-1-thread-3进入仓库,准备生产!
pool-1-thread-4从仓库取得产品:产品-1433422526
pool-1-thread-4进入仓库,准备消费!
pool-1-thread-5往仓库中生产了一个产品!产品-137356193
pool-1-thread-5进入仓库,准备生产!
pool-1-thread-1往仓库中生产了一个产品!产品831540660
...

接下来就直接进入正题吧,看下ConditionObject.await()方法。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

先判断线程状态是否被中断,如果被中断则抛出异常。然后开始构建Condition队列,看下addConditionWaiter()方法。

private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

先判断condition尾节点的waitStatus是否是condition状态。如果不是只能是cancel状态,则把队列中是cancel状态的节点移除。找到第一个是condition状态的尾节点。

新建一个Node节点,模式是CONDITION,如果之前不存在尾节点。则把当前节点作为头结点和尾节点,否则把当前节点置为尾节点。然后执行fullyRelease(node)方法。

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

首先进入到await的方法的前提条件是获取到锁,所以这一步是释放锁并且唤醒AQS队列头结点的后置节点。这里为什么要释放锁呢,很简单await方法会阻塞当前线程。当前线程如果不释放锁就会导致后面的线程获取不到锁从而阻塞。极大的影响性能还可能造成死锁。所以await方法是会释放锁的。

再看isOnSyncQueue(node)是判断当前节点是否在AQS队列中,如果不在则阻塞当前节点所在线程。直到signal方法唤醒该线程。那么先看下signal():

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
 private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

首先把当前节点waitStatus状态CAS操作为0。如果设置失败,且该节点为condition头结点,则把该节点排除出队列。
如果设置成功,则把该节点插入到AQS队列中。也就是说Signal()唤醒后不是立即执行的。而是进入到AQS队列中排队。
如果该节点被取消了或者已经被设置成了SIGNAL,则取消阻塞该节点所在线程。其他情况由AQS头节点唤醒。

再回过头看await方法。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

之前睡眠的线程被唤醒了,改节点已经加入到了AQS队列可以退出循环了,然后主要就是去获取锁并返回线程的中断状态。

相关文章

网友评论

      本文标题:ReentrantLock源码分析(三)

      本文链接:https://www.haomeiwen.com/subject/acgwhqtx.html