美文网首页
Java concurrent包源码走读(二)

Java concurrent包源码走读(二)

作者: 忘净空 | 来源:发表于2017-03-20 13:03 被阅读90次

    简介

    AQS(AbstractQueuedSynchronizer)是Java并发工具基础,要掌握Java并发工具类首先得熟悉AQS,通过对AQS的学习,我们将进一部理解共享锁、独占锁、公平锁、非公平锁、重入锁等常见锁。AQS作为一个模板方法,它定义并发工具类处理流程,接下来让我们来了解下AQS的处理流程。

    AQS接口

    AQS既然是基于模板方法实现,那它即提供了模板方法同时也提供可重新的方法。

    可重写的方法

    方法名称 描述
    protected boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
    protected boolean tryRelease(int arg) 独占式释放同步状态,等待后去同步状态的线程将有机会获取同步状态
    protected boolean tryAcquireShare(int arg) 共享式获取同步状态,返回大于等于0的值则成功,反之,获取失败
    protected boolean tryReleaseShare(int arg) 共享式释放同步状态
    protected boolean isHeldExclusively() 当前同步器是否被当前线程独占

    模板方法

    方法名称 描述
    void acquire(int arg) 独占式获取同步状态,当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,将会调用tryAcquire(int arg)方法
    void acquireInterruptibly(int arg) 与acquire(int arg),但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该线程会抛出InterrupteredException并返回
    protected boolean tryAcquireNanos(int arg,long nanos) 在acquireInterruptibly基础上新增超时时间,如果当前线程在超时时间内没有获取到同步状态,返回false,如果获取到返回true
    protected boolean acquireShared(int arg) 共享式获取同步状态,与独占的区别是在同一时刻可以友有多个线程获取同步状态
    protected boolean tryAcquireInterruptibly(int arg) 与acquireShared(int arg)相同,该方法响应中断
    protected boolean tryAcquireSharedNanos(int arg,long nanos) 在tryAcquireInterruptibly(int arg)方法上新增超时限制
    boolean release(int arg) 独占式的释放同步状态,该方法会在释放同步状态后 ,将同步队列中第一个节点包含的线程唤醒
    boolean releaseShared(int arg) 同步式的释放同步状态
    Collection<Tread> getQueuedThreads() 获取等待在同步队列上的线程集合

    AQS基本结构

    AQS依赖内部的同步队列来完成同步状态的管理,同步队列中的节点(Node)用来保存"获取同步状态失败的线程"引用、等待状态以及前驱和后继节点。节点的属性类型与名称及描述:

    属性类型和名称 描述
    int waitStatus 等待状态:(1)CANCELLED,值为1,由于在同步队列中的线程等待超时或者被中断,需要从同步队列中取消等待 (2)SIGNAL,值为-1,后继节点的线程处于等待状态,而当前节点的线程如果或者释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 (3)CONDITION,值是-2,节点在等待队列中,节点线程在Condition上,其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到该同步状态的获取中 (4)PROPAGATE,值为-3,表示下一次共享式同步状态获取将会无条件地传播下去 (5)INITIAL,值为0,初始状态
    Node prev 前驱节点
    Node next 后继节点
    Node nextWaiter 等待队列的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARE常量,也就是说节点类型和等待队列中的后继节点共用同一个字段
    Thread thread 获取同步状态的线程

    根据Node基本属性可得下图:AQS基于一个双向链表实现的同步队列,同时有个基于单向链表实现的条件队列,注意这个条件队列不是必须的,它也可以有多个。

    源码走读

    独占式获取锁

    首先我们看一个流程图,熟悉其大致流程

    源码

    //acquire(int arg) 
    public final void acquire(int arg) {
            //首先调用tryAcquire(arg)方法,这个具体在子类中实现,通过这个方法可以实现公平锁和不公平锁,如果这个方法获取锁成功则直接返回,不再执行后面的代码
            //获取失败后便执行addWaiter(Node.EXCLUSIVE), arg)方法,将其添加到队列中,同时标记为独占锁
            //acquireQueued(final Node node, int arg)方法中首先尝试获取锁,如果获取不到则阻塞线程
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    //addWaiter(Node mode)
    private Node addWaiter(Node mode) {
            //创建当前线程的Node
            Node node = new Node(Thread.currentThread(), mode);
            Node pred = tail;
            //如果tail节点不为null
            if (pred != null) {
                //当前线程节点前驱指向tail
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {//CAS 当前线程Node设置为tail成功
                    //上个tail的后继节点指向现在的tail
                    pred.next = node;
                    //返回当前线程Node
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
    //enq(node);
    private Node enq(final Node node) {
            for (;;) {//自旋
                Node t = tail;
                if (t == null) { // 如果tail节点为null
                    //初始化head,同时tail指向head
                    //从后面的代码可以看出头结点是傀儡节点,
                    //每次通过判读线程节点的前驱是否是头结点来决定是否获取锁
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {//将node插入到队尾
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    //acquireQueued(final Node node, int arg)
    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {自旋
                    //获取当前线程节点的前驱继节点
                    final Node p = node.predecessor();
                    //如果前驱是头节点,尝试获取锁
                    if (p == head && tryAcquire(arg)) {//获取成功
                        //设置当前节点是头节点
                        setHead(node);
                        p.next = null;
                        failed = false;
                        return interrupted;
                    }
                    //获取失败阻塞其他线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                //获取失败
                if (failed)
                    //取消正在进行的尝试获取
                    cancelAcquire(node);
            }
        }
    
    // shouldParkAfterFailedAcquire(Node pred, Node node)
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
           //如果前驱节点状态是Node.SIGNAL直接返回,
           //Node.SIGNAL表示前驱获取锁后会唤醒后继节点的线程
            return true;
        if (ws > 0) {
            //pred.waitStatus表示Node.CANCELLED,下面的操作跳过状态为CANCELLED的所有节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //waitStatus 等于0(初始化)或PROPAGATE,
            //说明线程还没有park,会先重试确定无法acquire到再park,
            //这里主要是共享锁时传播共享状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    //parkAndCheckInterrupt()
    private final boolean parkAndCheckInterrupt() {
            //阻塞当前线程
            LockSupport.park(this);
            //返回线程状态同时清除标记位,在acquire(int arg)中会再次中断状态的中断线程
            return Thread.interrupted();
        }
    
    //cancelAcquire(Node node)
    private void cancelAcquire(Node node) {
        //如果节点为null,直接返回
        if (node == null)
            return;
        node.thread = null;
        //跳过当前节点的所有前驱是CANCELLED状态的节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        Node predNext = pred.next;
        //标记节点状态为Node.CANCELLED
        node.waitStatus = Node.CANCELLED;
        //如果当期节点是尾节点则直接删除
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            //下面的操作是如果当前线程的节点的前驱不能满足唤醒后继的条件,则删除当前节点,
            //满足则唤醒当前节点的后继节点
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
    
            node.next = node; // help GC
        }
    }
    

    独占式释放锁

    public final boolean release(int arg) {
        if (tryRelease(arg)) {//tryRelease(arg)方法子类重新
            Node h = head;
            if (h != null && h.waitStatus != 0) //head存在
                //唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    通过上面连个方法我们可以了解独占锁获取释放锁的流程,除去独占锁外还有共享锁、带有超时时间的独占式和共享锁。共享锁的获取锁后唤醒后继节点传播共享,具体大家可以查看源码。

    自己简单的理解:其实具体的流程很简单,首先创建一个节点,head和tail都指向这个节点,然后各个线程去获取锁,其实就是判断它的前驱节点是不是头结点同时还有tryAcquire()是否返回true。如果获取成功设置当前节点为头节点,获取失败则将其插入到同步队列,设置状态为SIGNAL同时阻塞当前线程,SIGNAL可以保证前驱节点释放后唤醒当前节点,当前节点可以放心阻塞。然后每个出去头结点的节点开始自旋尝试获取锁,直到获取到锁返回。

    相关文章

      网友评论

          本文标题:Java concurrent包源码走读(二)

          本文链接:https://www.haomeiwen.com/subject/ffgbnttx.html