美文网首页程序员
Semaphore源码解读

Semaphore源码解读

作者: 程序员cookie | 来源:发表于2020-11-10 17:43 被阅读0次

关键字:AQS、自旋、CAS、LockSupport、CLH阻塞队列

1. AQS

Semaphore的相关操作主要由其内部成员变量sync完成,sync有两种,分别是支持公平锁的FairSync和不公平锁的NonfairSync,两种都是基于AQS扩展而来。我们在声明一个信号量对象的时候,sync便在构造函数里被初始化。这里先简单介绍以下AQS,后续会出一篇文章详细解读。

AQS全名为AbstractQueuedSynchronizer,即抽象队列同步器,是并发包作者Doug Lea为了解决在Java 1.5之前synchronized性能问题而开发的并发框架,主要实现有ReentrantLock,ReentrantReadWriteLock, CountDownLatch, Semaphore等,和synchronized对标的便是ReentrantLock。

AQS内维护了一个volatile类型的int 成员变量state,以及一个双向CLH队列,线程尝试修改state属性值,修改成功便表明成功获取锁,否则进入CLH队列并阻塞,直到持有锁的线程释放,并唤醒CLH队列中的线程。

2. 初始化信号量

我们在初始化Semaphore的时候,便指定了state的值,表明可以获取的最大信号量,线程尝试获取信号量即对state减去相应的值,修改成功便表明成功获取信号量,否则进入CLH队列并阻塞,直到持有信号量的线程释放,state加1,并唤醒CLH队列中的全部线程。

sync由构造函数进行初始化


//构造permits个数量的不公平锁
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

//根据fair构造permits个数量的公平&不公平锁
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
    

3. 加锁操作(获取信号量)

Semaphore提供了8中常用的加锁操作,可分为三大类,即获取一定数量的共享锁&是否支持中断&获取不到是否阻塞,以下8中操作便是其两两组合。

//通过sync获取共享锁(可中断)
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

//通过sync获取共享锁(不可中断)
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

//尝试获取1个共享锁,获取不到则立刻返回false,不进行阻塞    
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}
    
//尝试获取1个共享锁,获取不到则等待timeout时间后返回false  
public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
 
//尝试获取permits个共享锁,获取不到则立刻返回false,不进行阻塞   
public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

//尝试获取permits个共享锁,获取不到则等待timeout时间后返回false  
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

先来看一下获取可中断锁


public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //线程是否被中断,中断则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 1.首先尝试获取共享锁
    // 2.获取成功则进行相应的业务逻辑,获取失败进入阻塞队列
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

获取信号量锁时又分为公平锁和不公平锁,以下分别是两种锁是如何获取的


static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        //自旋
        for (;;) {
            //阻塞队列中是否已经有节点在等待,如有则直接返回获取失败
            //这个判断就是和不公平锁的区别,不公平锁不管队列中是否有节点等待,上来就抢锁
            if (hasQueuedPredecessors())
                return -1;
            //通过CAS设置state
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        //调用父方法
        /*
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
        */
        return nonfairTryAcquireShared(acquires);
    }
}

获取锁失败请求入队

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    //将当前节点放入阻塞队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 自旋
        for (;;) {
            //获取当前节点的上一节点
            final Node p = node.predecessor();
            //上一节点如果是队头
            if (p == head) {
                //再次尝试获取args数量的共享锁,r为剩余的共享数量
                int r = tryAcquireShared(arg);
                //获取成功
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //上一节点如果不是队头,即阻塞队列中已经有节点在等待或者是队头但获取锁失败则执行以下逻辑
            //1.将当前节点的有效前驱节点标示为可唤醒状态
            //2.将当前节点阻塞,等待被唤醒或中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
    }
//获取当前节点的上一个节点
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}


/*
 *
 * pred 上一个节点
 * node 当前节点
 * CANCELLED =  1;SIGNAL    = -1;CONDITION = -2;PROPAGATE = -3;
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //上一个节点已经处于可唤醒状态则直接返回
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
         //节点已失效,将失效节点的前驱节点赋值为当前节点的前驱节点,直到前驱节点不存在已经取消的情况
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        //通过CAS将有效的前驱节点的状态修改为可唤醒状态 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


private static final boolean compareAndSetWaitStatus(Node node,
                                                     int expect,
                                                     int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                    expect, update);
}

private final boolean parkAndCheckInterrupt() {
    //线程阻塞在这里
    LockSupport.park(this);
    //线程被唤醒时从这里开始执行
    return Thread.interrupted();
}

现在来对比分析一下尝试获取(tryAcquire)、不可中断、最大尝试时间分别是如何处理的

tryAcquire

//尝试获取,获取不到直接返回了
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

设置最大获取时间

private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //注意这里的队头只是一个虚拟节点,真正存放线程的节点为队列的第二个节点,以下提到的队头同样
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                //区别在这里,线程只会park一定时间,过期后再次尝试获取失败便直接返回
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

不可中断锁


private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //和可中断锁区别在这里,可中断这里直接抛出异常了,但是不可中断锁只是设置一个值便又去获取了
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

4. 解锁操作

//调用sync的releaseShared方法进行解锁,每次解锁数量为1
public void release() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
//通过自旋+CAS将共享锁的数量加回去
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        //CLH队列不为空,即有线程在等待获取锁
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //唤醒头节点的next节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        //刚才被唤醒的线程已将head设置为head的下一节点,所以这里不会相等
        //所以这里一般会多唤醒一次,假如多唤醒的节点获取到锁,重复此逻辑,否则多唤醒的节点会继续阻塞
        if (h == head)                   // loop if head changed
            break;
    }
}

这里再贴一下节点被唤醒时的逻辑

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
            //线程被唤醒后从这里再次执行    
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

相关文章

  • Semaphore源码解读

    关键字:AQS、自旋、CAS、LockSupport、CLH阻塞队列 1. AQS Semaphore的相关操作主...

  • 8.AQS共享锁

    一、Semaphore实现原理解析 1.1Semaphore实例 1.2Semaphore源码解析 new Sem...

  • python--线程semaphore

    源码: tests/semaphore.py 测试: tests/main.py

  • python--线程BoundedSemaphore

    源码: tests/bounded_semaphore.py 测试: tests/main.py

  • Semaphore 源码分析

    Semaphore 源码分析 1. 在阅读源码时做了大量的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅...

  • Semaphore 源码分析

    需要提前了解的知识点: AbstractQueuedSynchronizer 实现原理 类介绍 Semaphore...

  • Semaphore源码分析

    1. Semaphore 定义 Semaphore 主要用于限量控制并发执行代码的工具类, 其内部通过 一个 pe...

  • Semaphore源码分析

    整体概况 Semaphore是借助AQS实现的的共享锁,通过构造参数可以给状态变量赋值,用来控制对资源访问的并发度...

  • Semaphore源码分析

    1.概述 这篇文章主要用来介绍Semaphore源码。应该也是最后一篇了。还有一个chm感觉有些地方还有点问题,就...

  • AFN 3.0学习总结(最后的总结 转载)

    AFNetworking 3.0 源码解读 总结(干货)(上) AFNetworking 3.0 源码解读 总结(...

网友评论

    本文标题:Semaphore源码解读

    本文链接:https://www.haomeiwen.com/subject/kybjbktx.html