美文网首页
JAVA-Lock解析-七-Semaphore解析

JAVA-Lock解析-七-Semaphore解析

作者: AlanSun2 | 来源:发表于2019-10-15 13:42 被阅读0次

    信号量 Semaphore 是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么 acquire 方法将一直阻塞,直到其它线程释放许可。

    Semaphore 通常用于线程对统一资源的访问控制。需要注意的是它没有实现 Lock 接口,也不支持 Condition。官方示例:

     class Pool {
       private static final int MAX_AVAILABLE = 100;
       private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    
       public Object getItem() throws InterruptedException {
         available.acquire();
         return getNextAvailableItem();
       }
    
       public void putItem(Object x) {
         if (markAsUnused(x))
           available.release();
       }
    
       // Not a particularly efficient data structure; just for demo
    
       protected Object[] items = ... whatever kinds of items being managed
       protected boolean[] used = new boolean[MAX_AVAILABLE];
    
       protected synchronized Object getNextAvailableItem() {
         for (int i = 0; i < MAX_AVAILABLE; ++i) {
           if (!used[i]) {
              used[i] = true;
              return items[i];
           }
         }
         return null; // not reached
       }
    
       protected synchronized boolean markAsUnused(Object item) {
         for (int i = 0; i < MAX_AVAILABLE; ++i) {
           if (item == items[i]) {
              if (used[i]) {
                used[i] = false;
                return true;
              } else
                return false;
           }
         }
         return false;
       }
     }
    

    在获取项目之前,每个线程必须从信号灯获取许可,以确保可以使用该项目。线程完成该项目后,将其返回到池中,并向信号量返回一个许可,从而允许另一个线程获取该项目。

    Semaphore 内部使用了共享锁,没有独占锁和 ReentrantLock 正好相反,还有一个 ReadLock in ReentrantReadWriteLock 的区别是 Semaphore 可以在构造方法中指定共享锁的数量,ReadLock 只能是默认的 65535,而且大于 65535 还会抛出异常。Semaphore 在大于指定的共享锁数量时会阻塞。

    同样 Semaphore 有三个静态内部类 Sync,NonFairSync,FairSync。

    abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    
            Sync(int permits) {
                setState(permits);
            }
    
            final int getPermits() {
                return getState();
            }
            
            //调用过程:
            //Semaphore#acquire-> NonFairSync#tryAcquireShared -> 本方法
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {//采用自旋直到 没有空闲的共享锁或 CAS 修改 state 的值成功
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
            
            //调用过程:
            //Semaphore#release -> AQS#releaseShared -> 本方法
            protected final boolean tryReleaseShared(int releases) {
                for (;;) {//自旋直到 CAS 成功
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow,表示 releases 不能是一个负值
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
            
            //减少许可证的数量
            final void reducePermits(int reductions) {
                for (;;) {
                    int current = getState();
                    int next = current - reductions;
                    if (next > current) // underflow
                        throw new Error("Permit count underflow");
                    if (compareAndSetState(current, next))
                        return;
                }
            }
            
            //把当前的可用的许可证清空
            final int drainPermits() {
                for (;;) {
                    int current = getState();
                    if (current == 0 || compareAndSetState(current, 0))
                        return current;
                }
            }
        }
    
        /**
         * 非公平锁实现
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    
        /**
         * 公平锁实现
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = 2014338818796000944L;
    
            FairSync(int permits) {
                super(permits);
            }
            
            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
        }
    

    相关文章

      网友评论

          本文标题:JAVA-Lock解析-七-Semaphore解析

          本文链接:https://www.haomeiwen.com/subject/csyjmctx.html