美文网首页
J.U.C锁之 Semaphore

J.U.C锁之 Semaphore

作者: 贪睡的企鹅 | 来源:发表于2019-07-10 16:43 被阅读0次

Semaphore 简介

Semaphore 名为"信号量"。

Semaphore用来管理内部许可证,当多个线程要访问竞争资源时可以通过Semaphore来控制并发访问竞争资源的线程数。当线程需要访问竞争资源时需要首先获取一个许可证,执行完毕在返还,如果许可证用完则,线程进入同步队列并阻塞。等待许可证返回唤醒。

主要特性

公平性:支持公平性和非公平性。所谓公平表示在获取锁时逻辑是否要考虑当前正在排队等待线程。按照大白话来说就时公平表示不能插入强占资源。

应用场景

常用函数

获取许可

/** 获取一个许可,获取失败将线程添加到同步队列,并阻塞,等待归还唤醒  **/
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    /** 和acquireUninterruptibly功能,同时能响应中断  **/
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /** 和acquireUninterruptibly功能,同时增加阻塞超时  **/
    public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /** 尝试获取许可,如果信号量中不存在许可之际返回false,成功返回true **/
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /** 获取指定数量的许可,获取失败将线程添加到同步队列,并阻塞,等待归还唤醒  **/
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /** 和acquireUninterruptibly(int )功能,同时能响应中断  **/
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /** 和acquire(int)功能,同时增加阻塞超时  **/
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }


    /** 尝试指定数量的许可,如果信号量中不存在许可之际返回false,成功返回true **/
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

释放许可

/** 释放一个许可, **/
public void release() {
    sync.releaseShared(1);
}


/** 释放指定数量许可, **/
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

其他方法

 /**
     * 获取许可的数量
     */
    public int availablePermits() {
        return sync.getPermits();
    }


    /**
     * 清空许可
     */
    public int drainPermits() {
        return sync.drainPermits();
    }


    /**
     * 根据指定的缩减量减小可用许可的数目
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    /**
     * 判断当前对象是否是公平信号量
     */
    public boolean isFair() {
        return sync instanceof FairSync;
    }


    /**
     * 判断是否有线程在同步队列等待许可
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }


    /**
     * 获取等待许可的线程数量
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }


    /**
     * 获取等待许可的线程集合
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

2 实现原理

Semaphore 使用AQS实现锁机制,AQS是AbstractQueuedSynchronizer的缩写,翻译过来就是"同步器",,它实现了Java函数中锁同步(synchronized),锁等待(wait,notify)功能。

AbstractQueuedSynchronizer是一个抽象类,我们可以编写自己类继承AQS重写获取独占式或共享式同步状态模板方法,实现锁锁同步(synchronized),锁等待(wait,notify)功能

2.1 AQS 实现原理

AQS核心是一个同步状态,两个队列。它们实现了Java函数中锁同步(synchronized),锁等待(wait,notify),并在其基础上实现了独占式同步,共享式同步2中方式锁的实现。

无论独占式还时共享式获取同步状态成功则直接返回,失败则进入CLH同步队列并阻塞当前线程。当获取同步状态线程释放同步状态,AQS会选择从CLH队列head头部节点的第一个节点释放阻塞,尝试重写竞争获取同步状态,如果成功则将当前节点出队。如果失败则继续阻塞。

获取同步状态的线程也可以使用condition对象释放同步状态进入等待队列。只有等待其他线程使用condition.signal或condition.signAll()唤醒被从阻塞状态中释放重新竞争获取同步状态成功后从原来指令位置继续运行。

2.1.1 同步状态

AQS实现了锁,必然存在一个竞争资源。AQS存在从一个int类型的成员变量state,我们把它称为同步状态,同步状态通常用做判断线程能否获取锁的依据

2.1.2 同步队列

AQS 实现了锁那么总需要一个队列将无法获取锁的线程保存起来,方便在锁释放时通知队列中线程去重新竞争锁。

实现原理
同步队列又被称为CLH同步队列,CLH队列是通过链式方式实现FIFO双向队列。当线程获取同步状态失败时,AQS则会将当前线程构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态被释放时,会把首节点后第一个节点的线程从阻塞状态下唤醒,唤醒的线程会尝试竞争同步状态,如果能获取同步状态成功,则从同步队列中出队。

image
2.1.3 Condition & 等待队列
  • Java 传统的监视器有如下函数 wait、notify、notifyAll。它们可以实现当一个线程获取锁时,它可以主动放弃锁进入一个条件队列中。只有其他线程通知时才从条件队列中出队,重新获取锁成功后继续执行之前的未完成代码逻辑。

  • AQS内部存在一个内部类实现了Condition接口,其内部维护着一条链式实现单向等待队列。我们可以使用AQS获取内部实现Condition接口对象,调用await(),signal(),signalAll()函数实现Java中wait、notify、notifyAll同样功能。

实现原理

  • 当获取同步状态的线程调用condition.await(),则会阻塞,并进入一个等待队列,释放同步状态.
  • 当其他线程调用了condition.signal()方法,会从等待队列firstWaiter开始选择第一个等待状态不是取消的节点.添加到同步队列尾部.
  • 当其他线程调用了condition.signalAll()方法,会从等待队列firstWaiter开始选择所有等待状态不是取消的节点.添加到同步队列尾部.

这里取消节点表示当前节点的线程不在参与排队获取锁。

image
2.1.4 独占式同步

从概念上来说独占式对应只存在一个资源,且只能被一个线程或者说竞争者占用.

2.1.5 共享式同步

从概念上来说共享式对应存在多个资源的是有多个线程或者竞争者能够获取占用.

2.2 模板方法

我们可以编写自己类继承AQS选择重写独占式或共享式模板方法,从而定义如何获取同步状态和释放同步状态的逻辑。

2.2.1 独占式

tryAcquire:尝试独占式获取同步状态,返回值为true则表示获取成功,否则获取失败。

tryRelease
尝试独占式释放同步状态,返回值为true则表示获取成功,否则获取失败。

2.2.2 共享式

tryAcquireShared:尝试共享式获取同步状态,当返回值为大于等于0的时获得同步状态成功,否则获取失败。

tryReleaseShared:尝试共享式释放同步状态,返回值为true则表示获取成功,否则获取失败。

2.3 如何基于AQS实现Semaphore

由于多个线程可以同时许可同时执行,当然我们选择使用共享同步,Sync需要重写 tryAcquire 获取同步状态条件逻辑,tryRelease释放同步条件逻辑。其核心点在于使用同步状态做判断。当同状态为0时,许可被使用完了,同步状态大于0,许可被还可用,每次调用tryAcquire同步状态-1,每次调用tryRelease同步状态+1

2.4 类结构

内部存在有三个内部类 Sync、NonfairSync 和 FairSync 类。

  • Sync 继承 AbstractQueuedSynchronizer 抽象类。
  • NonfairSync(非公平锁) 继承 Sync 抽象类。
  • FairSync(公平锁) 继承 Sync 抽象类。

Semaphore 很多方法都通过代理内部类的方法实现。

2.5 核心方法

公平信号量获取许可

/**
 * 公平信号量获取同步状态逻辑
 */
protected int tryAcquireShared(int acquires) {
    for (;;) {
        /** 只有同步队列中不存在线程。且同步状态可以获取才能获取锁 **/
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

非公平信号量获取许可

/**
 * 非公平信号量获取同步状态逻辑
 * 返回true 表示获取同步状态成功
 * 返回false 表示获取同步状态失败
 */
final int nonfairTryAcquireShared(int acquires) {
    /** 循环+CAS **/
    for (;;) {
        /** 获取父类同步状态 **/
        int available = getState();
        /** 计算同步状态 -acquires **/
        int remaining = available - acquires;
        /** 使用CAS设置同步状态, **/
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

释放许可

 /**
 * 释放同步状态
 */
protected final boolean tryReleaseShared(int releases) {
    /** 循环+CAS **/
    for (;;) {
        /** 获取父类同步状态 **/
        int current = getState();
        /** 计算同步状态 +releases **/
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        /** 使用CAS设置同步状态, **/
        if (compareAndSetState(current, next))
            return true;
    }
}

完整源码

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    private final Sync sync;

    /**
     * Sync 继承 AbstractQueuedSynchronizer 抽象类
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        /**
         * 初始化同步状态
         */
        Sync(int permits) {
            setState(permits);
        }

        /**
         * 返回同步状态
         */
        final int getPermits() {
            return getState();
        }

        /**
         * 非公平信号量获取同步状态逻辑
         * 返回true 表示获取同步状态成功
         * 返回false 表示获取同步状态失败
         */
        final int nonfairTryAcquireShared(int acquires) {
            /** 循环+CAS **/
            for (;;) {
                /** 获取父类同步状态 **/
                int available = getState();
                /** 计算同步状态 -acquires **/
                int remaining = available - acquires;
                /** 使用CAS设置同步状态, **/
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        /**
         * 释放同步状态
         */
        protected final boolean tryReleaseShared(int releases) {
            /** 循环+CAS **/
            for (;;) {
                /** 获取父类同步状态 **/
                int current = getState();
                /** 计算同步状态 +releases **/
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                /** 使用CAS设置同步状态, **/
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        /**
         * 清理指定数量许可,修改同步状态
         */
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        /**
         * 清空许可,修改同步状态
         */
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }


    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        /**
         * 获取同步状态
         */
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }


    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        /**
         * 公平信号量获取同步状态逻辑
         */
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                /** 只有同步队列中不存在线程。且同步状态可以获取才能获取锁 **/
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

    /**  创建具有给定的许可数和非公平的公平设置的 Semaphore。 **/
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**  创建具有给定的许可数和给定的公平设置的 Semaphore。 **/
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }



    /** 获取一个许可,获取失败将线程添加到同步队列,并阻塞,等待归还唤醒  **/
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    /** 和acquireUninterruptibly功能,同时能响应中断  **/
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /** 和acquireUninterruptibly功能,同时增加阻塞超时  **/
    public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /** 尝试获取许可,如果信号量中不存在许可之际返回false,成功返回true **/
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /** 获取指定数量的许可,获取失败将线程添加到同步队列,并阻塞,等待归还唤醒  **/
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /** 和acquireUninterruptibly(int )功能,同时能响应中断  **/
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /** 和acquire(int)功能,同时增加阻塞超时  **/
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }


    /** 尝试指定数量的许可,如果信号量中不存在许可之际返回false,成功返回true **/
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }


    /** 释放一个许可, **/
    public void release() {
        sync.releaseShared(1);
    }


    /** 释放指定数量许可, **/
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }


    /**
     * 获取许可的数量
     */
    public int availablePermits() {
        return sync.getPermits();
    }


    /**
     * 清空许可
     */
    public int drainPermits() {
        return sync.drainPermits();
    }


    /**
     * 根据指定的缩减量减小可用许可的数目
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    /**
     * 判断当前对象是否是公平信号量
     */
    public boolean isFair() {
        return sync instanceof FairSync;
    }


    /**
     * 判断是否有线程在同步队列等待许可
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }


    /**
     * 获取等待许可的线程数量
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }


    /**
     * 获取等待许可的线程集合
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }


    public String toString() {
        return super.toString() + "[Permits = " + sync.getPermits() + "]";
    }
}

相关文章

网友评论

      本文标题:J.U.C锁之 Semaphore

      本文链接:https://www.haomeiwen.com/subject/xvohkctx.html