美文网首页程序员
[Java并发编程] 用 信号量(Semaphore) 实现一个

[Java并发编程] 用 信号量(Semaphore) 实现一个

作者: seaicelin | 来源:发表于2018-06-07 07:48 被阅读111次

    欲穷千里目,更上一层楼。—唐·王之涣《登颧雀楼》
    这句诗的意思是:想看到更远更广阔的景物,你就要再上一层楼。想学到更多更深的知识,你就要比原来更努力。

    PS: 如果觉得本文有用的话,请帮忙点赞,留言评论支持一下哦,您的支持是我最大的动力!谢谢啦~

    Semaphore,计数信号量,用来控制同时访问某个特定资源的线程数量,需要我们设定它的最大访问数量。 Semaphore 管理着一组虚拟许可,许可的初始数量可以通过构造函数来指定。在执行操作时可以首先获取许可,并在使用后释放许可。如果没有许可,那么获取操作将阻塞直到有可用的许可。

    Semaphore 可以用于实现一个资源池,也可以将任何一个容器变成一个有界的阻塞容器,他在限制资源访问量上有很大的用处。

    Semaphore 的核心方法

    首先,我们先来看它的两个构造函数。

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and nonfair fairness setting.
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and the given fairness setting.
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    

    参数 permits 表示许可数量,即同时允许多少个线程访问。参数 fair 表示公平性,即等待越久越先获取到许可。

    其次,再来看一下它获取和释放许可的方法,信号量的核心用法就是下面这些。

    //获取一个许可
    public void acquire() throws InterruptedException {  }
    
    //获取permits个许可
    public void acquire(int permits) throws InterruptedException { }
    
       //释放一个许可
    public void release() { }    
         
    //释放permits个许可
    public void release(int permits) { }    
    
    //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
    public boolean tryAcquire() { };    
    
    //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false  
    public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };  
    
       //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
    public boolean tryAcquire(int permits) { };
    
    //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; 
    
    

    使用场景

    前面说过,Semaphore 可以用于实现一个资源池。所以,我们用它来实现一个固定数量的消息池,只允许固定数量的线程同时访问。

    这个例子中消息池的数量为 3 个,信号量的许可数量也设置为 3 个,即用 Semaphore 来控制最多同时只能有三个线程使用,其中消息可以循环使用。

    如果已有三个线程已经获取到了消息,那么其他线程获取消息的时候将会阻塞,直到有线程释放消息,它才能获取到。获取消息和释放消息通过 Semaphore 的 acqure() 和 release() 方法进行控制,其中 Semaphore 的许可数量不应大于消息池的最大数量。

    import java.util.concurrent.Semaphore;
    
    public class SemaphoreTest{
    
    //表示消息池可用消息只有5个
    private static final int MAX_POOL_SIZE = 3;
    
    //获取消息的客户端的线程数量
    private static final int CLIENT_SIZE = 6;   
    
    //消息数组,存放所有消息
    private static Message[] messages = new Message[MAX_POOL_SIZE];   
    
    //信号量,许可数量为消息的最大可用数量
    private static Semaphore semaphore = new Semaphore(MAX_POOL_SIZE);
    
    //初始化消息数组
    static void  init() {
        for(int i = 0; i < MAX_POOL_SIZE; i++) {
            messages[i] = new Message();
        }
    }
    
    //同步方法,获取可用的消息
    static synchronized Message obtain() {
        Message msg = null;
        for(int i = 0; i < MAX_POOL_SIZE; i++) {
            if(messages[i].getFlag() == false) {
                msg = messages[i];
                msg.setId(i);
                msg.setFlag(true);
                return msg;
            }
        }
        return msg;
    }
    
    //同步方法,把用完的消息放回消息池
    static synchronized boolean release(Message msg) {
        if(msg.getFlag() == true) {
            msg.setFlag(false);
            msg.setId(-1);
            return true;
        }
        return false;
    }
    
    //用信号量控制能获取消息的数目
    static Message obtainMsg() throws InterruptedException {
        semaphore.acquire();
        return obtain();
    }
    
    //成功释放消息的同时释放信号量
    static void releaseMsg(Message msg) {
        System.out.print(Thread.currentThread().getName() + " ***Release msg id*** = "+ msg.getId() + "\n");
        if(release(msg)) {
            semaphore.release();
        }
    }
    
    public static void main(String[] args) {
        
        //初始化
        init();
        
        //创建子线程,获取消息
        for(int i = 0; i < CLIENT_SIZE; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //获取消息
                        Message msg = obtainMsg();
                        System.out.print(Thread.currentThread().getName() + " Obtain msg id = "+ msg.getId() + "\n");
                        //假装耗时操作
                        Thread.sleep(1000);
                        //释放消息
                        releaseMsg(msg);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }}).start();
        }
    }
    
    //声明一个消息类
    static class Message {
        private int id;         //表示每个消息的id
        private boolean flag;   //表示消息是否可用
    
        public Message() {
            this.id = -1;
            this.flag = false;
        }
        
        public void setId(int id) {
            this.id = id;
        }
        
        public void setFlag(boolean b) {
            this.flag = b;
        }
        
        public int getId() {
            return this.id;
        }
        
        public boolean getFlag() {
            return this.flag;
        }
    }
    
    }
    

    执行结果:


    这里写图片描述

    从这个结果看,线程 1,线程 2,线程 0 先获取到消息;接着线程 1 和 2 释放消息;释放消息后,那么此时消息池又有两个空闲消息,所以,线程 3 和线程 5 获取了消息;

    紧接着线程 0 释放消息,线程 4 立马获取了消息。。。

    这程序的执行结果和我们预期的流程一样。需要注意的点,Semaphore 是线程安全的。在这个例子中,不可能同时有 4 个线程能同时获取到消息。

    注意

    既然 Semaphore 是线程安全的,为什么上面两个方法需要添加同步?

    static synchronized boolean release(Message msg)
    static synchronized Message obtain()
    

    这里我们不能混淆概念,Semaphore 的线程安全是指同时只能有三个线程进入,即 acquire() 和 release() 必定线程安全。然而获取到许可后的操作不保证线程安全,所以这里加同步锁是为了确保获取消息的过程是安全的。

    另外一点需要注意,为什么下面两个方法不需要使用同步锁?

    static void releaseMsg(Message msg)
    static Message obtainMsg() throws InterruptedException
    

    细心的朋友可能已经知道,这里加上同步的话,会产生死锁。假如此时 acquire() 发生阻塞,那么obtainMsg() 一直持有同步锁,而 releaseMsg() 的时候必须等待同步锁的释放,这时必定陷入死锁,一直死等,然而没什么软用。
    这里不需要加同步锁,是因为我们要确保安全的内容是 获取许可集 后的数据安全,和释放许可集之前的数据安全。

    本文完结,如果觉得有帮助,请关注我哦,谢谢啦~

    相关文章

      网友评论

        本文标题:[Java并发编程] 用 信号量(Semaphore) 实现一个

        本文链接:https://www.haomeiwen.com/subject/xqswsftx.html