美文网首页
多线程-AQS-Condition

多线程-AQS-Condition

作者: 麦大大吃不胖 | 来源:发表于2021-01-28 22:57 被阅读0次

by shihang.mai

使用

public class ConTest {

    final Lock lock = new ReentrantLock();
    final Condition condition = lock.newCondition();

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ConTest test = new ConTest();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
        consumer.start();
        producer.start();
    }

    class Consumer extends Thread{

        @Override
        public void run() {
            consume();
        }

        private void consume() {

            try {
                lock.lock();
                System.out.println("我在等一个新信号"+this.currentThread().getName());
                condition.await();

            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally{
                System.out.println("拿到一个信号"+this.currentThread().getName());
                lock.unlock();
            }

        }
    }

    class Producer extends Thread{

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            try {
                lock.lock();
                System.out.println("我拿到锁"+this.currentThread().getName());
                condition.signalAll();
                System.out.println("我发出了一个信号:"+this.currentThread().getName());
            } finally{
                lock.unlock();
            }
        }
    }

}

/*
结果:
我在等一个新信号Thread-1
我拿到锁Thread-0
我发出了一个信号:Thread-0
拿到一个信号Thread-1
*/

源码分析

Lock接口中还有一个方法newCondition,创建一个条件队列。ReentrantLock的内部类Sync如下:

abstract static class Sync extends AbstractQueuedSynchronizer {
  final ConditionObject newCondition() {
            return new ConditionObject();
        }
}

而ConditionObject是AQS中的类

public class ConditionObject implements Condition, java.io.Serializable {
        private transient Node firstWaiter;
        private transient Node lastWaiter;
        public ConditionObject() { }

        //......
}
  1. 看上面使用的例子,Condition是需要配合Lock使用的,而Lock原理又依赖AQS,所以ConditionObject定义在AQS中。
  2. condition内部维护了一个单向的等待队列,所有调用condition.await方法的线程会加入到等待队列中,并且线程状态转换为等待状态
  3. 看上面ConditionObject,有两个属性firstWaiter和lastWaiter。它们都复用了AQS中CLH队列中的Node对象,不过只用了Node中的nextWaiter属性。当然一个Lock可以new多次Condition,形成多个Condition队列,如下图
多个Condition队列

await

public class ConditionObject implements Condition, java.io.Serializable {

  public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

      private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

}
  1. 先将当前节点封装为Conditon Node, firstWaiter和lastWaiter都指向它,如果不是第一个,直接将上一个的nextWaiter指向自身,并将lastWaiter指向自身。即形成单链表
  2. 释放AQS锁,并且唤醒在同步队列中头结点的后继节点引用的线程。
  3. 当线程第一次调用condition.await()方法时,会进入到这个while()循环中,挂起,当 当前线程被中断或者其他线程调用condition.signal/condition.signalAll方法将当前节点移动到了同步队列后,就会退出循环
  4. 退出循环后,调用acquireQueued放到CLH队列(ReentranLock中详细说了,这里不说)

主逻辑需要调用的AQS代码如下:

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }

signal

public class ConditionObject implements Condition, java.io.Serializable {
    
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
    }

主逻辑中AQS的辅助代码如下:

final boolean transferForSignal(Node node) {
        
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
  1. 将等待队列中的头节点Condition Node(waitState=-2)改为waitState=0
  2. 将节点放到CLH队列
condition队列迁移

singleAll也就将整个等待队列一个个往CLH队列放,不详细说.

相关文章

  • 多线程-AQS-Condition

    by shihang.mai 使用 源码分析 Lock接口中还有一个方法newCondition,创建一个条件队列...

  • iOS多线程 NSOperation

    系列文章: 多线程 多线程 pthread、NSThread 多线程 GCD 多线程 NSOperation 多线...

  • iOS多线程 pthread、NSThread

    系列文章: 多线程 多线程 pthread、NSThread 多线程 GCD 多线程 NSOperation 多线...

  • iOS多线程: GCD

    系列文章: 多线程 多线程 pthread、NSThread 多线程 GCD 多线程 NSOperation 多线...

  • iOS多线程运用

    系列文章: 多线程 多线程 pthread、NSThread 多线程 GCD 多线程 NSOperation 多线...

  • iOS多线程基础

    系列文章: 多线程 多线程 pthread、NSThread 多线程 GCD 多线程 NSOperation 多线...

  • 多线程介绍

    一、进程与线程 进程介绍 线程介绍 线程的串行 二、多线程 多线程介绍 多线程原理 多线程的优缺点 多线程优点: ...

  • iOS进阶之多线程管理(GCD、RunLoop、pthread、

    深入理解RunLoopiOS多线程--彻底学会多线程之『GCD』iOS多线程--彻底学会多线程之『pthread、...

  • iOS多线程相关面试题

    iOS多线程demo iOS多线程之--NSThread iOS多线程之--GCD详解 iOS多线程之--NSOp...

  • 多线程之--NSOperation

    iOS多线程demo iOS多线程之--NSThread iOS多线程之--GCD详解 iOS多线程之--NSOp...

网友评论

      本文标题:多线程-AQS-Condition

      本文链接:https://www.haomeiwen.com/subject/xuaxtltx.html