美文网首页
17.多线程总结(五)-并发工具类-Semaphore(信号量)

17.多线程总结(五)-并发工具类-Semaphore(信号量)

作者: 任振铭 | 来源:发表于2020-04-14 17:03 被阅读0次

Semaphore是一个计数信号量,可以维护当前访问自身的线程个数,并提供了同步机制。常用于限制可以访问某些资源的线程数量,使用场景:接口限流

如下示例代码,设置初始值为5,表示同时只能最多5个线程访问,我们在下边开启了8个线程,通过打印结果可以看出,启动之后立即有5个线程开始执行,4s之后,这5个线程release掉,其他三个线程才开始执行,可以表明Semaphore可以控制线程访问数量的结论

public class TestSemaphore {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(5);
        for (int i = 0; i < 8; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName()+"开始执行"+" time="+System.currentTimeMillis()/1000);
                        Thread.sleep(4000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
                }
            }).start();
        }
    }
}
打印结果:
Thread-0开始执行 time=1586484962
Thread-1开始执行 time=1586484962
Thread-2开始执行 time=1586484962
Thread-3开始执行 time=1586484962
Thread-4开始执行 time=1586484962

Thread-5开始执行 time=1586484966
Thread-7开始执行 time=1586484966
Thread-6开始执行 time=1586484966

源码

Semaphore(5)

从构造方法中可以看出,Semaphore有两种实现,公平和非公平,都是通过继承Sync实现,而Sync又是继承了AbstractQueuedSynchronizer,可以发现目前遇到这几个并发工具类都和AbstractQueuedSynchronizer有联系

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            //同样是维护了一个state值,作为计数使用
            setState(permits);
        }
    }
    protected final void setState(int newState) {
        state = newState;
    }
acquire()
    public void acquire() throws InterruptedException {
        //这个实现和CountdownLatch是一样的
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //如果中断抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试获取,这个实现不同于CountdownLatch,而是在Sync中有自己的实现
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared的默认非公平实现就是nonfairTryAcquireShared方法,可以看到又是一个for无限循环,而退出循环的条件是我们需要关注的if (remaining < 0 ||compareAndSetState(available, remaining)),由于我们上边示例中设置的state=5,所以可以看到这里的操作就是拿到这个state,然后将它减1,判断是不是小于0(小于0表示满足已经有state个线程调用了acquire方法的条件),假设第一次进来,那么-1后remaining=4,判断<0不满足,然后通过CAS的方式将state值更新为remaining,然后返回remaining=4,跳出循环,如果没有更新成功就进入下一次循环。

这里有点容易混淆,当第一个线程进来的时候,返回的remaining=4,第二个线程进来,返回的remaining=3,也就是线程和remaining的对应关系为
线程 == remaining
1 ====== 4
2 ====== 3
3 ====== 2
4 ====== 1
5 ====== 0

要理解,第state个线程来的时候,返回的remaining正好是0,state+1的时候,remaining才是-1

       final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

我们再回到上边的方法acquireSharedInterruptibly中,由于if (tryAcquireShared(arg) < 0)不满足,所以方法直接返回了,doAcquireSharedInterruptibly没有被执行。知道第6个线程调用到acquire方法的时候,此时满足remaining=-1 <0 的条件,开始执行doAcquireSharedInterruptibly方法,其实也就是前5个线程都被放行了,到了第6个,需要进入等待状态

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //见过很多次了,ReentrantLock中一样,将线程节点加入链表并返回节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //拿到这个节点的前节点
                final Node p = node.predecessor();
                //如果前节点就是head,说明这个节点就是等待队列中的唯一一个
                //我们假设p == head 成立
                if (p == head) {
                    //判断state值,如果r >= 0说明能容纳的线程还没有满,如果r=0,
                    //那么正好说明当前线程是第state个线程,则不用休眠
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //移除这个线程节点,并唤醒某沉睡的线程node
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //走到这里说明r >= 0不满足,说明已经有state个线程在执行中,当前
                //线程是第state+1个,所以将这个线程挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

总结一下acquire方法,它的作用就是让前state个线程通过,超过state的线程休眠等待

release()
public void release() {
        sync.releaseShared(1);
    }

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared将state值不断的加1,我们假设某一线程在调用release方法的时候,5个线程都已经acquire过了,也就是说,此时的state=0(当第6个线程调用acquire方法的时候,由于remaining < 0的条件满足,所以不会再给state赋值,而是直接返回了remaining,也就是说state值最小为0)。那么+1之后就是1,也就是说,next可以看作记录的是有多少个线程释放了,每次release之后都会返回true,然后执行后边的doReleaseShared

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                //这里什么情况下+上一个数反而比它本身小?只有一种情况
                //state值等于int型最大值,再加1就越界,返回-1,-1<current
                //这是一个边界检查的处理
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

doReleaseShared的作用就是释放一个唤醒一个,这样就可以保证,在线程充足的情况下,一直有5个线程在运行

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

相关文章

网友评论

      本文标题:17.多线程总结(五)-并发工具类-Semaphore(信号量)

      本文链接:https://www.haomeiwen.com/subject/cfmlmhtx.html