美文网首页JDK源码
java.util.concurrent.Semaphore源码

java.util.concurrent.Semaphore源码

作者: sunpy | 来源:发表于2018-03-06 20:37 被阅读5次

    控制并发线程数Semaphore

    生活中,我们过桥,如果桥就能过3个人,那么一次就只能走三个人,如果多了,那么就会有人掉河里了。这就是Semaphore控制人数过桥。(而此处就是Semaphore控制只能有特定数量的线程访问指定资源)。

    继承与实现关系

    public class Semaphore implements java.io.Serializable
    

    Semaphore中的自定义的同步器

        /**
         * 
         * 自定义同步器继承AQS,使用AQS的状态state来控制同时访问的线程数(流量)
         */
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
            //创建指定线程数访问的同步器构造器
            Sync(int permits) {
                setState(permits);
            }
            //获取允许线程同时访问的数量
            final int getPermits() {
                return getState();
            }
            
            //采用非公平的方式尝试获取共享状态下的同步状态值
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    //获取当前同步状态值
                    int available = getState();
                    //计算剩余的同步状态值
                    int remaining = available - acquires;
                    /**
                     * 如果剩余的同步状态值小于0或者当前的同步状态值为available
                     * 将当前同步状态值更新为remaining
                     */
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        //返回当前最新的同步状态值
                        return remaining;
                }
            }
            
            //采用共享方式释放同步状态值
            protected final boolean tryReleaseShared(int releases) {
                //死循环
                for (;;) {
                    //获取当前的同步状态值
                    int current = getState();
                    //计算如果释放同步状态值之后,得到的结果next
                    int next = current + releases;
    
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    //如果当前同步状态值为current,那么更新当前的同步状态值为next
                    if (compareAndSetState(current, next))
                        //返回true
                        return true;
                }
            }
    
            //减少并发线程的数量
            final void reducePermits(int reductions) {
                //死循环
                for (;;) {
                    //获取当前的同步状态值
                    int current = getState();
                    //计算出剩下的并发线程数
                    int next = current - reductions;
                    if (next > current) // underflow
                        throw new Error("Permit count underflow");
                    //如果当前同步状态值为current,那么更新当前的同步状态值为next
                    if (compareAndSetState(current, next))
                        return;
                }
            }
            
            //将并发的线程数量调整为0
            final int drainPermits() {
                //死循环
                for (;;) {
                    //获取当前的同步状态值
                    int current = getState();
                    /**
                     * 如果当前的同步状态值为0或者当前同步状态值等于current
                     * 那么将当前的同步状态值current更新为0
                     */
                    if (current == 0 || compareAndSetState(current, 0))
                        return current;
                }
            }
        }
    

    非公平同步器

        /**
         * 非公平同步器
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
            //非公平同步器构造器
            NonfairSync(int permits) {
                super(permits);
            }
            //在共享模式下尝试获取同步状态值
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    

    公平同步器

        /**
         * 公平同步器
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = 2014338818796000944L;
            //公平同步器构造器
            FairSync(int permits) {
                super(permits);
            }
            //在共享模式下尝试获取同步状态值
            protected int tryAcquireShared(int acquires) {
                //死循环
                for (;;) {
                    //当前队列是否有前驱
                    if (hasQueuedPredecessors())
                        return -1;
                    //获取当前的同步状态值
                    int available = getState();
                    //计算出剩下的并发线程数
                    int remaining = available - acquires;
                    /**
                     * 如果剩下的并发线程数小于0或者当前同步状态值等于available
                     * 那么将当前的同步状态值更新为remaining
                     */
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        //返回计算出的最新同步状态值
                        return remaining;
                }
            }
        }
    
    

    构造器

        /**
         * 创建一个指定并发线程数的非公平同步器构造器
         */
        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
        /**
         * 创建一个指定并发线程数、是否公平的同步器构造器
         */
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
    

    方法

        /**
         * 创建一个指定并发线程数的非公平同步器构造器
         */
        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
        /**
         * 创建一个指定并发线程数、是否公平的同步器构造器
         */
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
    
        /**
         * 在信号量Semaphore中获取一个许可
         * 在获取一个许可前线程将会阻塞,否则线程被中断
         * 整体许可数将减少1
         */
        public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        /**
         * 
         * 在信号量Semaphore中获取一个许可
         * 在获取一个许可前线程将会阻塞,虽然等待时被中断,但是仍然将继续等待
         * 整体许可数将减少1
         */
        public void acquireUninterruptibly() {
            sync.acquireShared(1);
        }
    
        /**
         * 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
         */
        public boolean tryAcquire() {
            return sync.nonfairTryAcquireShared(1) >= 0;
        }
    
        /**
         * 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可
         */
        public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        /**
         * 释放一个许可,将其返回给信号量
         */
        public void release() {
            sync.releaseShared(1);
        }
    
        /**
         * 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断
         */
        public void acquire(int permits) throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireSharedInterruptibly(permits);
        }
    
        /**
         * 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞
         */
        public void acquireUninterruptibly(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireShared(permits);
        }
    
        /**
         * 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可
         */
        public boolean tryAcquire(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            return sync.nonfairTryAcquireShared(permits) >= 0;
        }
    
        /**
         * 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
         */
        public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
        }
    
        /**
         * 释放给定数目的许可,将其返回到信号量
         */
        public void release(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.releaseShared(permits);
        }
    
        /**
         * 返回此信号量中当前可用的许可数
         */
        public int availablePermits() {
            return sync.getPermits();
        }
    
        /**
         * 获取并返回立即可用的所有许可
         */
        public int drainPermits() {
            return sync.drainPermits();
        }
    
        /**
         * 根据指定的缩减量减小可用许可的数目
         */
        protected void reducePermits(int reduction) {
            if (reduction < 0) throw new IllegalArgumentException();
            sync.reducePermits(reduction);
        }
    
        /**
         * 如果此信号量的公平设置为 true,则返回 true
         */
        public boolean isFair() {
            return sync instanceof FairSync;
        }
    
        /**
         * 查询是否有线程正在等待获取
         */
        public final boolean hasQueuedThreads() {
            return sync.hasQueuedThreads();
        }
    
        /**
         * 返回正在等待获取的线程的估计数目
         */
        public final int getQueueLength() {
            return sync.getQueueLength();
        }
    
        /**
         * 返回一个 collection,包含可能等待获取的线程
         */
        protected Collection<Thread> getQueuedThreads() {
            return sync.getQueuedThreads();
        }
    
    

    应用例子

    public class SemaphoreTest {  
        private static final int PERSON_NUM=4;  
        private static ExecutorService es=Executors.newFixedThreadPool(PERSON_NUM);  
        private static Semaphore s=new Semaphore(3,true);  
        public static void release(Semaphore s,String name){  
            s.release();  
            System.out.println(name+"已经离开桥了!");  
        }  
        public static void main(String[] args) {  
            es.execute(new Runnable(){  
                @Override  
                public void run() {  
                    try {  
                        s.acquire();  
                        System.out.println("甲上桥了!");  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }      
            });  
            es.execute(new Runnable(){  
                @Override  
                public void run() {  
                    try {  
                        s.acquire();  
                        System.out.println("乙上桥了!");  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }      
            });  
            es.execute(new Runnable(){  
                @Override  
                public void run() {  
                    try {  
                        s.acquire();  
                        System.out.println("丙上桥了!");  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }  
            });  
            es.execute(new Runnable(){  
                @Override  
                public void run() {  
                    try {  
                        release(s,"甲");  
                        s.acquire();  
                        System.out.println("丁上桥了!");  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }  
            });  
        }  
    }  
    

    结果:
    甲上桥了!
    乙上桥了!
    丙上桥了!
    甲已经离开桥了!
    丁上桥了!

    解释:虽然线程池里面有甲乙丙丁四个线程准备过桥Semaphore,但是Semaphore只能让三个人过桥,所以,甲没离开桥上时,丁是无法上桥的,所以甲离开之后,丁就可以上桥了。


    ---------------------------该源码为jdk1.7版本的

    相关文章

      网友评论

        本文标题:java.util.concurrent.Semaphore源码

        本文链接:https://www.haomeiwen.com/subject/gsgrfftx.html