美文网首页Java并发编程线程
Java并发编程 - Semaphore(信号机)

Java并发编程 - Semaphore(信号机)

作者: HRocky | 来源:发表于2019-03-07 15:10 被阅读11次

    Semaphore在中文的书籍里面把它翻译成"信号量"。说实在话,在很长的时间我都无法从这个翻译上很直观得明白它到底是个什么东西。"信号的数量"? 什么鬼?!

    字典里的解释是: 臂板信号系统,(铁道)臂板信号装置。它是一种系统或装置,嗯呢,好像有点明白了。

    在学习与使用了这个类之后,我更喜欢将它翻译成——信号机或信号管理器。

    在编程世界里提出这个概念是用来解决对代码临界区进行访问控制的问题。临界区是受保护的,它里面包含了对敏感资源的操作,多线程环境下需要对这片区域加以控制,以保证安全地无误得操作资源。

    Semaphore是一个信号机,它可以产生信号(把它看作一个实体),线程得到信号就表示它有权进入临界区,当线程执行完临界区内的操作,退出临界区,信号会被回收以使得信号机可以将它重新分配给另一个想进入临界区的线程。

    Semaphore工作原理可以类比于下面的场景:

    Semaphore是澡堂管理员,它手上有有限数量的澡牌,每个要洗澡的人过来,首先需要到澡堂管理员这里来获取澡牌,只有得到澡牌才能进入澡堂洗澡,洗完澡后出来要把澡牌还给澡堂管理员;澡牌发完之后,后面来洗澡的人就需要在澡堂门口持续等待,直到澡堂管理员手上有回收回来的澡牌。

    类比这个场景,Semaphore中应该包含的逻辑,用伪代码可以按照如下书写:

    value; // 澡牌的数量
    
    // 请求澡牌
    acquire() {
        while value <=0 
            ;// 现在没有澡牌了,只有等了
         value--; // 澡牌的数量-1
    }
    
    // 归还澡牌
    release() {
        value++;// 澡牌的数量+1
    }
    

    Java语言提供了java.util.concurrent.Semaphore类,它是信号机机制的实现类。

    澡堂洗澡场景-1

    我们通过使用java.util.concurrent.Semaphore类来实现澡堂洗澡这个场景的模拟。

    Bathhouse.java

    import java.util.concurrent.Semaphore;
    
    public class Bathhouse {
        
        private static final Semaphore bathhouseManager = new Semaphore(10);// 澡堂管理员手上有10张澡牌
        
        public void bathe() {
            try {
                bathhouseManager.acquire();
                System.out.println(Thread.currentThread().getName() + ": 在洗澡...");
                Thread.sleep(3000);
                System.out.println(Thread.currentThread().getName() + ": 出澡堂...");
                bathhouseManager.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
        }
        
        public static void main(String[] args) {
            Bathhouse bathhouse = new Bathhouse(); 
            
            for (int i=1; i<=100; i++) {
                new Thread(new Runnable() {
    
                    @Override
                    public void run() {
                        bathhouse.bathe();
                    }
                    
                }, "洗澡君" + i).start();
            }
        }
    
    }
    

    上面这句代码:

    private static final Semaphore bathhouseManager = new Semaphore(10);
    

    表示澡堂管理员手上只有10个澡牌,也就是表示只有10个澡位。

    运行代码我们可以发现,运行后立即会输出10条"在洗澡..."语句,这表示一次性可以同时有10个洗澡的人得到澡牌进入澡堂,后面会断断续续得输出。

    浴室洗澡场景-1

    你家来了10位朋友,但是浴室只有一个,假设你们正在玩扑克,有一张扑克代表是洗澡牌,抽到洗澡牌的朋友就可以去洗澡,洗完澡之后把洗澡牌归还,然后继续抽牌,抽到的朋友可以去洗澡,如此反复......

    还是使用java.util.concurrent.Semaphore类来模拟这个场景。

    Bathroom.java

    import java.util.concurrent.Semaphore;
    
    public class Bathroom {
        
        private static final Semaphore poker = new Semaphore(1);// 一副扑克里面只有一张洗澡牌
        
        public void bathe() {
            try {
                poker.acquire();
                System.out.println(Thread.currentThread().getName() + ": 在洗澡...");
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + ": 出浴室...");
                poker.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
        }
        
        public static void main(String[] args) {
            Bathroom bathroom = new Bathroom(); 
            
            for (int i=1; i<=10; i++) {
                new Thread(new Runnable() {
    
                    @Override
                    public void run() {
                        bathroom.bathe();
                    }
                    
                }, "朋友-" + i).start();
            }
        }
    
    }
    

    运行程序我们可以发现你的朋友是一个一个去洗澡的。

    浴室洗澡场景-2

    你家来了2个朋友A和B,由你规定只有A先洗完澡B才能去洗。模拟代码如下:

    Bathroom.java

    import java.util.concurrent.Semaphore;
    
    public class Bathroom {
        
        public static void main(String[] args) {
            
            final Semaphore poker = new Semaphore(0);
            
            // A线程
            new Thread(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        poker.acquire();
                        System.out.println(Thread.currentThread().getName() + ": 在洗澡...");
                        Thread.sleep(2000);
                        System.out.println(Thread.currentThread().getName() + ": 出浴室...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                
            }, "朋友-B").start();
            
            // B线程
            new Thread(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + ": 在洗澡...");
                        Thread.sleep(2000);
                        System.out.println(Thread.currentThread().getName() + ": 出浴室...");
                        poker.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                
            }, "朋友-A").start();
            
        }
    
    }
    

    运行结果,输出的结果是我们预期的。

    上面的代码我们可以看到acquire()和release()是分开到两个线程内部的。这样分开后就可以实现朋友A先洗朋友B后洗的需求。

    回到代码中来:

    final Semaphore poker = new Semaphore(0);
    

    这里按照我们刚才的说法是一张洗澡牌都没有,朋友B去请求获取洗澡牌的时候不会得到洗澡牌,所以就算朋友B首先提出了要先洗,也是洗不了的。但是你的朋友A很特殊,她是你的女朋友,她不用向你申请洗澡牌就可以去洗澡,而且她希望澡之后还获得了一张洗澡牌,她给了朋友B,朋友B就可以去洗澡。

    上面的说明可能有点牵强,这里本来不是介绍java.util.concurrent.Semaphore的源码实现的,不过为了对这个例子进行解释,还是来进行简单的说明。

    java.util.concurrent.Semaphore内部维护了一个变量(在AQS类中定义的,这里我们就简单地理解为是它维护的):

    private volatile int state;
    

    表示Semaphore这个信号机内部拥有的信号的数量(哈哈,信号量)。

    acquire()方法调用会申请获取一个信号,

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    

    如果信号量不够(我们这里就是state=0),也就是说state-1< 0,那么信号获取失败,线程被加入到等待队列中。

    这里一定要注意:信号量不够不一定是指state=0,而是信号机没有那么多信号给你,比如说你申请3个,但是现在state=1,只有一个信号,无法满足你的申请需求。所以程序中判断是否能够满足你的申请要求是通过【state-你的申请量】是否大于0来决定的。

    acquire()方法是无参方法,默认就申请一个信号。

    上面线程B先运行,调用acquire() 方法的时候因为当前的state=0,所以无法获取到信号,被加入到等待队列中。

    线程A执行到release()方法,会释放一个信号,那么此时state=1,并且会唤醒等待队列中等待的B线程。

    public void release() {
        sync.releaseShared(1);
    }
    

    B线程被唤醒继续执行,这时候因为state=1了,有了1个信号,而B请求的也是1个信号,刚好满足,朋友B就能去洗澡了。

    澡堂洗澡场景-2

    为了加深对java.util.concurrent.Semaphore的使用,我们继续来看一个场景。

    场景:有个暴发户去洗澡,他要包场,不过他提前没打招呼,他过来洗的时候有人已经进去洗了,所以他要等里面的人全洗完,再去洗。

    也就是说暴发户要一次性请求10张洗澡牌,请求成功后他才进去洗。

    Bathhouse.java

    import java.util.concurrent.Semaphore;
    
    public class Bathhouse {
        
        public static void main(String[] args) throws InterruptedException {
            
            Semaphore bathhouseManager = new Semaphore(10);
             
            // 线程A
            Thread threadA = new Thread(new Runnable(){
    
                @Override
                public void run() {
                    try {
                        
                        Thread.sleep(2000);// 避免暴发户一开始就能拿到10张澡牌
                        
                        boolean flag = false;
                        
                        while (!flag) {
                            flag = bathhouseManager.tryAcquire(10);
                            System.out.println("暴发户在等澡牌...");
                        }
                        
                        System.out.println("***暴发户进入洗澡了...***");
                        Thread.sleep(2000);
                        System.out.println("***暴发户出澡堂了...***");
                        
                        /**
                         * 暴发户只归还了一张澡牌
                         */
                        bathhouseManager.release(1);
                        
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                
            });
            threadA.start();
            
            for (int i=1; i<=10; i++) {
                new Thread(new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            bathhouseManager.acquire();
                            System.out.println(Thread.currentThread().getName() + ": 在洗澡...");
                            Thread.sleep(3000);
                            System.out.println(Thread.currentThread().getName() + ": 出澡堂...");
                            bathhouseManager.release();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    
                }, "洗澡君-" + i).start();
            }
            
            threadA.join();
            
            
            System.out.println("***暴发户拿走了" + (10 - bathhouseManager.availablePermits()) + "张澡牌...***");
            
        }
        
    
    }
    

    运行上面的程序,输出的结果可以保证:所有的洗澡君洗完之后,暴发户才会去洗,否则他会一直等待洗澡牌满10张。

    ***暴发户进入洗澡了...***
    ***暴发户出澡堂了...***
    ***暴发户拿走了9张澡牌...***
    

    这三行在最后末尾输出。

    我们上面通过相关的场景讲的是java.util.concurrent.Semaphore这个信号机的使用,也说明了一下基本的原理,但是有些关于实现的问题没有深入源码进行讲解。

    比如说请求获取信号失败后当前线程怎么办?

    对于这个问题,其中的一种解决办法是让当前线程一直循环获取,也就是说自旋,这样不用挂起线程,然后再唤起线程,导致不必要的上下文切换的开销。不过自旋表示线程还在运行,这样还是会占用CPU的,如果其他获取到信号的线程执行任务的时候不是很长,也就是说能很快地能从临界区中退出,那么自旋效果还是挺好的。

    还有另外一种解决办法就是在信号机中维护一个队列,将信号获取失败的线程放入到队列中以等待下次唤醒。这就好比没有澡牌后,澡堂管理员将要洗澡的人带到休息等待区,等有澡牌后就去叫他们,将澡牌再分配。当然了,这时候可能有非等待区的人到来了,这个时候澡牌如何分配,是先来后到,还是懒得去叫等待区的人,而是让新来的人进去洗,这就又涉及到一个公平和非公平分配的问题了,这个是后话了。

    java.util.concurrent.Semaphore基于java.util.concurrent.locks.AbstractQueuedSynchronizer,这里我们不会深入源码,会有专门的文章讲解AQS,把AQS讲通了,信号机内部具体是怎么实现的也就清楚了。

    好了,最后通过JDK API中对Semaphore作用的描述来结束本篇文章:

    Semaphore(信号机)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

    相关文章

      网友评论

        本文标题:Java并发编程 - Semaphore(信号机)

        本文链接:https://www.haomeiwen.com/subject/eqdidqtx.html