美文网首页
Java concurrent包源码走读(二)

Java concurrent包源码走读(二)

作者: 忘净空 | 来源:发表于2017-03-20 13:03 被阅读90次

简介

AQS(AbstractQueuedSynchronizer)是Java并发工具基础,要掌握Java并发工具类首先得熟悉AQS,通过对AQS的学习,我们将进一部理解共享锁、独占锁、公平锁、非公平锁、重入锁等常见锁。AQS作为一个模板方法,它定义并发工具类处理流程,接下来让我们来了解下AQS的处理流程。

AQS接口

AQS既然是基于模板方法实现,那它即提供了模板方法同时也提供可重新的方法。

可重写的方法

方法名称 描述
protected boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
protected boolean tryRelease(int arg) 独占式释放同步状态,等待后去同步状态的线程将有机会获取同步状态
protected boolean tryAcquireShare(int arg) 共享式获取同步状态,返回大于等于0的值则成功,反之,获取失败
protected boolean tryReleaseShare(int arg) 共享式释放同步状态
protected boolean isHeldExclusively() 当前同步器是否被当前线程独占

模板方法

方法名称 描述
void acquire(int arg) 独占式获取同步状态,当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,将会调用tryAcquire(int arg)方法
void acquireInterruptibly(int arg) 与acquire(int arg),但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该线程会抛出InterrupteredException并返回
protected boolean tryAcquireNanos(int arg,long nanos) 在acquireInterruptibly基础上新增超时时间,如果当前线程在超时时间内没有获取到同步状态,返回false,如果获取到返回true
protected boolean acquireShared(int arg) 共享式获取同步状态,与独占的区别是在同一时刻可以友有多个线程获取同步状态
protected boolean tryAcquireInterruptibly(int arg) 与acquireShared(int arg)相同,该方法响应中断
protected boolean tryAcquireSharedNanos(int arg,long nanos) 在tryAcquireInterruptibly(int arg)方法上新增超时限制
boolean release(int arg) 独占式的释放同步状态,该方法会在释放同步状态后 ,将同步队列中第一个节点包含的线程唤醒
boolean releaseShared(int arg) 同步式的释放同步状态
Collection<Tread> getQueuedThreads() 获取等待在同步队列上的线程集合

AQS基本结构

AQS依赖内部的同步队列来完成同步状态的管理,同步队列中的节点(Node)用来保存"获取同步状态失败的线程"引用、等待状态以及前驱和后继节点。节点的属性类型与名称及描述:

属性类型和名称 描述
int waitStatus 等待状态:(1)CANCELLED,值为1,由于在同步队列中的线程等待超时或者被中断,需要从同步队列中取消等待 (2)SIGNAL,值为-1,后继节点的线程处于等待状态,而当前节点的线程如果或者释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 (3)CONDITION,值是-2,节点在等待队列中,节点线程在Condition上,其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到该同步状态的获取中 (4)PROPAGATE,值为-3,表示下一次共享式同步状态获取将会无条件地传播下去 (5)INITIAL,值为0,初始状态
Node prev 前驱节点
Node next 后继节点
Node nextWaiter 等待队列的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARE常量,也就是说节点类型和等待队列中的后继节点共用同一个字段
Thread thread 获取同步状态的线程

根据Node基本属性可得下图:AQS基于一个双向链表实现的同步队列,同时有个基于单向链表实现的条件队列,注意这个条件队列不是必须的,它也可以有多个。

源码走读

独占式获取锁

首先我们看一个流程图,熟悉其大致流程

源码

//acquire(int arg) 
public final void acquire(int arg) {
        //首先调用tryAcquire(arg)方法,这个具体在子类中实现,通过这个方法可以实现公平锁和不公平锁,如果这个方法获取锁成功则直接返回,不再执行后面的代码
        //获取失败后便执行addWaiter(Node.EXCLUSIVE), arg)方法,将其添加到队列中,同时标记为独占锁
        //acquireQueued(final Node node, int arg)方法中首先尝试获取锁,如果获取不到则阻塞线程
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

//addWaiter(Node mode)
private Node addWaiter(Node mode) {
        //创建当前线程的Node
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        //如果tail节点不为null
        if (pred != null) {
            //当前线程节点前驱指向tail
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {//CAS 当前线程Node设置为tail成功
                //上个tail的后继节点指向现在的tail
                pred.next = node;
                //返回当前线程Node
                return node;
            }
        }
        enq(node);
        return node;
    }

//enq(node);
private Node enq(final Node node) {
        for (;;) {//自旋
            Node t = tail;
            if (t == null) { // 如果tail节点为null
                //初始化head,同时tail指向head
                //从后面的代码可以看出头结点是傀儡节点,
                //每次通过判读线程节点的前驱是否是头结点来决定是否获取锁
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {//将node插入到队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

//acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {自旋
                //获取当前线程节点的前驱继节点
                final Node p = node.predecessor();
                //如果前驱是头节点,尝试获取锁
                if (p == head && tryAcquire(arg)) {//获取成功
                    //设置当前节点是头节点
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                //获取失败阻塞其他线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //获取失败
            if (failed)
                //取消正在进行的尝试获取
                cancelAcquire(node);
        }
    }

// shouldParkAfterFailedAcquire(Node pred, Node node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
       //如果前驱节点状态是Node.SIGNAL直接返回,
       //Node.SIGNAL表示前驱获取锁后会唤醒后继节点的线程
        return true;
    if (ws > 0) {
        //pred.waitStatus表示Node.CANCELLED,下面的操作跳过状态为CANCELLED的所有节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //waitStatus 等于0(初始化)或PROPAGATE,
        //说明线程还没有park,会先重试确定无法acquire到再park,
        //这里主要是共享锁时传播共享状态
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

//parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
        //阻塞当前线程
        LockSupport.park(this);
        //返回线程状态同时清除标记位,在acquire(int arg)中会再次中断状态的中断线程
        return Thread.interrupted();
    }

//cancelAcquire(Node node)
private void cancelAcquire(Node node) {
    //如果节点为null,直接返回
    if (node == null)
        return;
    node.thread = null;
    //跳过当前节点的所有前驱是CANCELLED状态的节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    Node predNext = pred.next;
    //标记节点状态为Node.CANCELLED
    node.waitStatus = Node.CANCELLED;
    //如果当期节点是尾节点则直接删除
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        //下面的操作是如果当前线程的节点的前驱不能满足唤醒后继的条件,则删除当前节点,
        //满足则唤醒当前节点的后继节点
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

独占式释放锁

public final boolean release(int arg) {
    if (tryRelease(arg)) {//tryRelease(arg)方法子类重新
        Node h = head;
        if (h != null && h.waitStatus != 0) //head存在
            //唤醒后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

通过上面连个方法我们可以了解独占锁获取释放锁的流程,除去独占锁外还有共享锁、带有超时时间的独占式和共享锁。共享锁的获取锁后唤醒后继节点传播共享,具体大家可以查看源码。

自己简单的理解:其实具体的流程很简单,首先创建一个节点,head和tail都指向这个节点,然后各个线程去获取锁,其实就是判断它的前驱节点是不是头结点同时还有tryAcquire()是否返回true。如果获取成功设置当前节点为头节点,获取失败则将其插入到同步队列,设置状态为SIGNAL同时阻塞当前线程,SIGNAL可以保证前驱节点释放后唤醒当前节点,当前节点可以放心阻塞。然后每个出去头结点的节点开始自旋尝试获取锁,直到获取到锁返回。

相关文章

网友评论

      本文标题:Java concurrent包源码走读(二)

      本文链接:https://www.haomeiwen.com/subject/ffgbnttx.html