美文网首页并发编程
信号量-semaphore -05

信号量-semaphore -05

作者: 愤怒的奶牛 | 来源:发表于2019-08-17 14:35 被阅读0次

    并发中多个线程访问同一个资源可以通过同步解决,但是多个线程访问多个资源怎么解决呢?如公交车上有10个位置,但是现在有15个人,要控制这15个人来抢占位置。juc 引入semaphore 来解决这个问题。

    /**
     * 信号量:控制多线程对多个资源的访问
     */
    public class SemaphoreDemo {
    
        //设置10个许可,这个和线程池不一样,信号量里面并没有创建线程,而是通过 所谓的许可证(可理解为 token )来控制
        // 拿到 token 的就可以访问资源,到 token 被拿完了,就会被放入等待队列
        private static final Semaphore semaphore = new Semaphore(10);
    
        public static void main(String[] args) throws InterruptedException {
            for (int i=0;i<20;i++) {
                new Thread(()->{
                    try {
                        semaphore.acquire();//获取执行许可
                        System.out.println("执行线程=" + Thread.currentThread().getName());
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        semaphore.release();//释放
                    }
                }).start();
            }
        }
    }
    
    jkaj4kz8mm.png

    常用api

            int availablePermits = semaphore.availablePermits();//可用 token
            int drainPermits = semaphore.drainPermits();//获得并返回所有立即可用的许可证数量
            int queueLength = semaphore.getQueueLength();//返回当前可能在阻塞获取许可证线程的数量
            boolean hasQueuedThreads = semaphore.hasQueuedThreads();//查询是否有线程正在等待获取许可证
            boolean fair = semaphore.isFair();//返回是否为公平模式
            boolean tryAcquire = semaphore.tryAcquire();//非阻塞的获取一个许可证(token)
    

    下面我们来稍微分析一下semaphore 的源码 。

    //构造器,传入token 数量
      public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    //构造器,传入token 数量,是否是公平锁、否公平锁
      public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
    
    • NonfairSync
    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
          //父类构造器
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    
    • Sync
    abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
             // 构造器
            Sync(int permits) {
               // 使用 state 字段来作为 token 数量,在往上面就是 AQS ,暂时到这里。
                setState(permits);
            }
    
            final int getPermits() {
                return getState();
            }
    
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    
            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    
            final void reducePermits(int reductions) {
                for (;;) {
                    int current = getState();
                    int next = current - reductions;
                    if (next > current) // underflow
                        throw new Error("Permit count underflow");
                    if (compareAndSetState(current, next))
                        return;
                }
            }
    
            final int drainPermits() {
                for (;;) {
                    int current = getState();
                    if (current == 0 || compareAndSetState(current, 0))
                        return current;
                }
            }
        }
    

    通过上面的源码分析,我们看出 在我们 private static final Semaphore semaphore = new Semaphore(10); 时,设置了一个 state 字段来记录 token 的数量,也就时许可证的数量。下面我们来分析一下 semaphore.acquire(); 这个方法的实现。也就是现在我们的token 有了,就等 线程来取了, semaphore.acquire(); 就时获取token。

    • semaphore.acquire()
    public class Semaphore implements java.io.Serializable {
        private static final long serialVersionUID = -3222578661600680210L;
        /** All mechanics via AbstractQueuedSynchronizer subclass */
        private final Sync sync; 
        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
        public void acquire() throws InterruptedException {
           // 进去,该方法在 AQS
            sync.acquireSharedInterruptibly(1);
        }
    }
    
    • AQS
    // arg == 1
    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())//判断当前线程是否中断
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)// tryAcquireShared(arg) 看一下这个方法
                doAcquireSharedInterruptibly(arg);
        }
    
    • tryAcquireShared
    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
                super(permits);
            }
    // 因为前面我们初始化时选择的 是NonfairSync  ,所以具体实现在这里 acquires = 1
           protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);// 看一下这个方法
            }
    }
    
    • nonfairTryAcquireShared
    // Semaphore 内部类
       abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    
            Sync(int permits) {
                setState(permits);
            }
    
            final int getPermits() {
                return getState(); // 我们初始化时 ,这个值 是10
            }
    
           // 这个方法就 线程来 获取token 的逻辑 acquires == 1
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {// 自旋
                    int available = getState(); //  如果是第一个线程来,这个值就是初始值,否则就是 剩下的 token 数量。
                  // 总的数量 - 1 ,每个 线程来都取走一个
                    int remaining = available - acquires;
    // 这里出 for 有两个条件,remaining < 0 表示 token 数量没有了,
    //试想一下,初始化 10 个token ,共有15 个 ,每来一个线程 就 减1,第10 个的时候 remaining  = 0 ,
    //这时刚好把token 用完,第11 个来时 就变成了 0-1 = -1 ,remaining = -1 ,并返回。
    //在没有取完token 时,remaining < 0 这个条件不会满足。
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))// 第二个条件,CAS ,也就每次取走一个token ,就更新 state 的值,也就说剩下的token 数量。返回的是 10-1 = 9 (剩下的数量)
                        return remaining;
                }
            }
    
    • compareAndSetState
    protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this, 这句就调用的 unsafe 对象 ,本地方法栈里面的。
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    
    • unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private static final long stateOffset;
        private static final long headOffset;
        private static final long tailOffset;
        private static final long waitStatusOffset;
        private static final long nextOffset;
    
        static {
            try {
                stateOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
                headOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
                tailOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
                waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
                nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));
    
            } catch (Exception ex) { throw new Error(ex); }
        }
    
      protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    }
    

    unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    this :需要更新字段所在的对象
    stateOffset : 字段 --> state
    expect : 期望值
    update : 更新值
    更新之前都会 比较 传入的 expect 值是否和原来的值 相同,相同就 更新。如 当前 state =10 ,expect = 10,update =9 ,则 state = 9。

    到这里我们就指定了 在 semaphore 信号量中,上层 semaphore.acquire(); 方法就是将底层 AQS 中的 state 值做修改,而所谓的 获取 执行许可(token )就是将 state 的值 -1 ,直到 取完为止。那么 如果 state = 10 ,但是现在有 15 个线程需要取,剩下5个没有获取成功的 线程又去哪里了呢?下面分析下这个逻辑:

     public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)//  tryAcquireShared(arg) 这个方法关键。
                doAcquireSharedInterruptibly(arg);
        }
    

    tryAcquireShared(arg) : arg = 1 。前面我们说到这个方法 nonfairTryAcquireShared 里面是一个 自旋(for(;;)),里面有个if 条件判断,有两种 条件可以进去,下面我们再来看一看代码:

     final int nonfairTryAcquireShared(int acquires) {
                for (;;) {// 自旋
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 || // 条件 1 
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    

    remaining < 0 。分析一下这个条件 什么时候满足:

    试想 : state = 10 ,总共有15 个线程需要获取 许可(token),
    当前面 10 都获取 成功后 state = 0,这是第11 个 过来了,
    int available = getState(); // available = state = 0
    int remaining = available - acquires; // 0-1 , remaining = -1 < 0。
    将 return -1。

    回到 :

      public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0) // 第11 个线程 满足 < 0 的条件。
                doAcquireSharedInterruptibly(arg);
        }
    
    • doAcquireSharedInterruptibly(arg); arg = 1
       /**
         * Acquires in shared interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            // addWaiter() 在队列里面 添加一个节点。新加入的节点再队尾,返回 的是 上一个节点,
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {// 自旋,依次 去队列里面的前一个,
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg); // 再一次尝试 获取 
                        if (r >= 0) { //获取成功,节点被剔除队列。
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    final Node node = addWaiter(Node.SHARED); 在 队列里面添加一个节点,也就说 没有获取到 许可(token) 的线程 被 包装在一个 Node 对象里面,并 添加到了一个队列里面。

    • addWaiter(Node node)
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    到这里我们就知道了,semaphore 中 获取 token 就是每次 将 state 的值 -1 ,当 state 的值 >=0 时 表示 线程有获取资源的权力,当 state 被消耗完了,下一个线程再来的时候,就会被放到一个队列里面,等待下次获取。

    • enq(node);
    // 入队操作
    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;// 队尾
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t; // 当node 的 前一个 节点 设置为 t 
                    if (compareAndSetTail(t, node)) { //设置 当前node 为 队尾
                        t.next = node; // 以前的 队尾 的下一个 节点 设置为 当前节点,当前节点 就成了对尾 
                        return t; //   Node t = tail;// 队尾
                    }
                }
            }
        }
    

    相关文章

      网友评论

        本文标题:信号量-semaphore -05

        本文链接:https://www.haomeiwen.com/subject/jdmhsctx.html