美文网首页
从Semaphore使用到AQS源码学习

从Semaphore使用到AQS源码学习

作者: 1010100 | 来源:发表于2019-10-29 21:19 被阅读0次

    Semaphore简介

    Semaphore信号量,主要作用就是做资源流控,举例饭店一共可以容纳5个人吃饭,当饭店中已经进入了5个人,那么第6个人只能在外面等待里面的人吃完之后才能就餐.Semaphore 就好像是饭店的招待.穗人员的进出进行登记管理.

    Semaphore 简单使用

        //两个信号指示器,分别表示饭店还有多少就餐位置,和已经就餐的人数
    private final Semaphore kong = new Semaphore(5); //空余位置5个
    private final Semaphore person = new Semaphore(0); //正在就餐的人数    
        //存放就餐工具的池子(餐具,桌椅)
    private static LinkedList<EatTool> pool = new LinkedList<EatTool>();
    
    static {
        System.out.println("饭店开张喽!!!");
        for (int i = 0; i < 5; i++) {
            pool.addLast(new EatTool());
        }
    }
    
       /*吃饱了*/
    public void returnEatTool(EatTool eattool) throws InterruptedException {
        if(eattool!=null) {
            person.acquire();
            synchronized (pool) {
                pool.addLast(eattool);
            }
            kong.release();
        }
    }
    
    /*准备就餐*/
    public Eattool takeEatTool() throws InterruptedException {
        kong.acquire();
        Eattool eattool;
        synchronized (pool) {
            eattool = pool.removeFirst();
        }
        person.release();
        return eattool;
    }
    

    特别说明的就是之所以声明两个信号量就是因为Semaphore在没有资源被使用的时候,有人调用release方法也是会是state状态计数累加,也就是凭空有可能多出很多的桌椅,这样就不能保证餐厅的高效运行,资源不能合理的利用

    Semaphore原理分析

    看源码我们可以清楚的知道Semaphore是使用AQS实现的 ,AQS是基于双向链表和Syn类(掌控同步锁的细节)
    这里就贴一段源码进行分析

        /**
        * Synchronization implementation for semaphore.  Uses AQS state
        * to represent permits. Subclassed into fair and nonfair
        * versions.
        */
        abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
    
        Sync(int permits) {
            setState(permits);
        }
    
        final int getPermits() {
            return getState();
        }
    
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
    
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
    
    /**
     * NonFair version(非公平锁的实现)
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
    
        NonfairSync(int permits) {
            super(permits);
        }
    
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    
    /**
     * Fair version(公平锁的实现)
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
    
        FairSync(int permits) {
            super(permits);
        }
    
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    

    这里就是Syn锁的全部细节.
    在我们的实际使用的过程中最基本的方法就是这两个方法了accquire()和release()了.一个负责请求(我要吃饭).一个负责释放(我吃饱了)


    image.png

    先去吃饭accquire()

    一层层进入源码我们就不难发现

    if (tryAcquireShared(arg) < 0) //尝试请求  state计数-1
            doAcquireSharedInterruptibly(arg); //失败后添加到等待队列中
    }
    

    核心的部分也就是这一段了
    tryAcquireShared() 就是我们根据AQS中提供的模板方法实现的细节getState()中是读取AQS类中的State记录的状态,当state中储存的int值小于零的时候请求的线程将在doAcquireSharedInterruptibly()准备进行阻塞,这里用到是对AQS中的双向节点列表进行的操作了,假如state的值大于零表示饭店还有位置可以就餐.这里用到了一些AQS的操作高效的保证线程安全,不了解的童鞋自行补课吧.

    吃饱了release()

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { //尝试释放资源 state 计数+1
            doReleaseShared(); //通知等待队列  可以释放锁了
            return true;
        }
        return false;
    }
    

    简单的解析就是正在就餐的人就餐完毕后会通过release()方法通知后面等待的人来就餐.

    最后

    使用AQS我们可以开发出各式各样的满足的我们要求的锁 AQS的理解还是很重要的,本文写很肤浅,部分逻辑和描述不是很清楚,笔者也是锻炼一下表达能力.具体细节请尊重源码吧.千言万语源码才是最好的说明文档.

    相关文章

      网友评论

          本文标题:从Semaphore使用到AQS源码学习

          本文链接:https://www.haomeiwen.com/subject/uongvctx.html