美文网首页并发编程
Condition源码分析

Condition源码分析

作者: 拥抱孤独_to | 来源:发表于2019-05-30 11:24 被阅读0次

并发源码分析篇:

Condition类似于Object方法中的await和signal,其作用都是将线程挂起和唤醒。但是Condition更加灵活,可以通过newCondition方法获取多个等待队列。

下面这段代码通过Condition实现了一个生产者和消费者模式,同时这段代码也类似于ArrayBlockingQueue阻塞队列的实现,当数组满了,生产者将会阻塞,只有当消费者消费了消息后唤醒生产者生产者才会继续工作,同理消费者也是,当数组中没有数据的话将会被阻塞,直到数组中有数据才会被唤醒继续工作,Condition的使用必须结合Lock锁的使用,需要先获取到锁,否则会抛异常。

package com.ltf.study.concurennt;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo {
    static Lock lock = new ReentrantLock();
    static Condition fullConditon = lock.newCondition();
    static Condition emptyCondition = lock.newCondition();
    static String[] arr = new String[5];
    static int takeindex;
    static int putindex;
    static int count;
    public static void main(String[] args) {
        for (int i = 0; i < 6; i++) {
            provider(i+"");
        }
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < 10; i++) {
            consumer();
        }
    }
    public static void provider(String str) {
        new Thread(()->{
            try {
                lock.lock();
                while (count >= 5) {
                    fullConditon.await();
                }
                System.out.println(Thread.currentThread().getName()+"==生产了:"+str);
                arr[putindex] = str;
                count++;
                putindex = ++putindex <5 ? putindex:0;
                emptyCondition.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        },"provider").start();
    }

    public static void consumer() {
        new Thread(()->{
            try {
                lock.lock();
                while (count == 0) {
                    emptyCondition.await();
                }
                System.out.println(Thread.currentThread().getName()+"消费:"+arr[takeindex]);
                arr[takeindex] = null;
                takeindex = ++takeindex <5 ? takeindex:0;
                count--;
                fullConditon.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        },"comsumer").start();
    }
}

运行结果

provider==生产了:0
provider==生产了:2
provider==生产了:1
provider==生产了:3
provider==生产了:4
comsumer消费:0
comsumer消费:2
comsumer消费:1
provider==生产了:5
comsumer消费:3
comsumer消费:4
comsumer消费:5

锁的使用就不用多说了,可以看ReentrantLock源码分析

下面是等待队列的结构,为单向链表



当线程获取锁,调用await方法时,我们来看他做了什么

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 构建等待队列链表
            Node node = addConditionWaiter();
            // 释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                // 阻塞当前线程
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 竞争锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

在这里首先会构建链表,然后释放锁,唤醒同步队列中的下一个节点去获取锁,然后阻塞当前线程。

        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

这里同样假设有A,B,C三个线程分别获取到锁调用await方法,就会构成如下队列并且阻塞,只有等到其他线程调用signal方法将节点加入到同步队列末尾等到前置节点将该节点唤醒才能继续在上次挂起的地方重新执行。



这时我们来看signal方法,该方法就是获取等待队列中的首节点传入doSignal方法中。

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

doSignal方法做了以下几件事

  1. 如果当前首节点没有下一个节点,将尾节点置空
  2. 将当前节点的下一个节点引用置空,也就是从等待队列中移除了。
  3. 将当前移除的等待节waitState状态设置为0并且加入到同步队列末尾,并且将同步队列中对应的前置节点的waitState状态设置为-1。
    signal做的事情其实就是将等待队列中的节点移到同步队列中

此时的同步队列和等待队列中的状态如下



A节点被移到了同步队列中,当被同步队列中的前置节点唤醒后,A节点继续会在上次挂起的地方运行。

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

设置中断状态为了后面时直接抛异常还是重新中断,并且继续执行判断isOnSyncQueue

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

此时的node waitState状态为0,并且前置节点不等于null,所以结果会返回true,所以跳出了while循环继续执行acquireQueued方法去抢占锁,继续抢占的原因是这是非公平锁,有可能被同步队列之外的其他线程获取到
至于sigalAll方法就是循环将每个节点移到同步队列中

        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

到这里Condition的基本实现结束了。

总结

Condition我们可以这样理解,就是在同步队列之外多了其他的等待队列,wait方法就是将同步队列中的节点移到等待队列中,而signal方法则是将等待队列中的节点移动到同步队列中,从而达到线程之间的通信

await
signal

相关文章

  • java并发-Condition接口

    Condition的简单使用 使用Condition实现的有界队列 核心方法 await()方法源码分析 sign...

  • Condition源码分析

    Java对象都有一组监视器方法:wait,notify,notifyAll,而synchronized本身就是利用...

  • Condition源码分析

    并发源码分析篇: ReentrantLock源码分析 ReentrantReadWriteLock源码分析 Con...

  • Condition源码分析

    我们先来看看Condtion类。 我们先来看个例子。 运行结果: 我们来分析下: 当WaitThread拿到锁之后...

  • J.U.C:Condition

    Condition源码分析   调用 Condition,需要获得 Lock 锁,所以意味着会存在一个 AQS 同...

  • Java 源码分析-Condition

      前面对Java中的锁进行了简单的分析,锁的使用和原理整体来说还是比较简单。今天我们来分析一下Condition...

  • ReentrantLock condition 源码分析

    本篇主要介绍ReentrantLock 中 condition的await/signal方法的实现原理。 想忽略整...

  • 深入解析AbstractQueuedSynchronizer源码

    前面分析了AbstractQueuedSynchronizer实现的其他两部分:Condition源码解析独占模式...

  • JUC之Condition源码分析

    原文出处:https://www.zzwzdx.cn Condition接口定义了类似Object的监视器方法,它...

  • 并发编程之 Condition 源码分析

    前言 Condition 是 Lock 的伴侣,至于如何使用,我们之前也写了一些文章来说,例如 使用 Reen...

网友评论

    本文标题:Condition源码分析

    本文链接:https://www.haomeiwen.com/subject/gueetctx.html