美文网首页并发编程
信号量-semaphore -05

信号量-semaphore -05

作者: 愤怒的奶牛 | 来源:发表于2019-08-17 14:35 被阅读0次

并发中多个线程访问同一个资源可以通过同步解决,但是多个线程访问多个资源怎么解决呢?如公交车上有10个位置,但是现在有15个人,要控制这15个人来抢占位置。juc 引入semaphore 来解决这个问题。

/**
 * 信号量:控制多线程对多个资源的访问
 */
public class SemaphoreDemo {

    //设置10个许可,这个和线程池不一样,信号量里面并没有创建线程,而是通过 所谓的许可证(可理解为 token )来控制
    // 拿到 token 的就可以访问资源,到 token 被拿完了,就会被放入等待队列
    private static final Semaphore semaphore = new Semaphore(10);

    public static void main(String[] args) throws InterruptedException {
        for (int i=0;i<20;i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();//获取执行许可
                    System.out.println("执行线程=" + Thread.currentThread().getName());
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();//释放
                }
            }).start();
        }
    }
}
jkaj4kz8mm.png

常用api

        int availablePermits = semaphore.availablePermits();//可用 token
        int drainPermits = semaphore.drainPermits();//获得并返回所有立即可用的许可证数量
        int queueLength = semaphore.getQueueLength();//返回当前可能在阻塞获取许可证线程的数量
        boolean hasQueuedThreads = semaphore.hasQueuedThreads();//查询是否有线程正在等待获取许可证
        boolean fair = semaphore.isFair();//返回是否为公平模式
        boolean tryAcquire = semaphore.tryAcquire();//非阻塞的获取一个许可证(token)

下面我们来稍微分析一下semaphore 的源码 。

//构造器,传入token 数量
  public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
//构造器,传入token 数量,是否是公平锁、否公平锁
  public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
  • NonfairSync
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
      //父类构造器
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
  • Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
         // 构造器
        Sync(int permits) {
           // 使用 state 字段来作为 token 数量,在往上面就是 AQS ,暂时到这里。
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

通过上面的源码分析,我们看出 在我们 private static final Semaphore semaphore = new Semaphore(10); 时,设置了一个 state 字段来记录 token 的数量,也就时许可证的数量。下面我们来分析一下 semaphore.acquire(); 这个方法的实现。也就是现在我们的token 有了,就等 线程来取了, semaphore.acquire(); 就时获取token。

  • semaphore.acquire()
public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    /** All mechanics via AbstractQueuedSynchronizer subclass */
    private final Sync sync; 
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    public void acquire() throws InterruptedException {
       // 进去,该方法在 AQS
        sync.acquireSharedInterruptibly(1);
    }
}
  • AQS
// arg == 1
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())//判断当前线程是否中断
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)// tryAcquireShared(arg) 看一下这个方法
            doAcquireSharedInterruptibly(arg);
    }
  • tryAcquireShared
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }
// 因为前面我们初始化时选择的 是NonfairSync  ,所以具体实现在这里 acquires = 1
       protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);// 看一下这个方法
        }
}
  • nonfairTryAcquireShared
// Semaphore 内部类
   abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState(); // 我们初始化时 ,这个值 是10
        }

       // 这个方法就 线程来 获取token 的逻辑 acquires == 1
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {// 自旋
                int available = getState(); //  如果是第一个线程来,这个值就是初始值,否则就是 剩下的 token 数量。
              // 总的数量 - 1 ,每个 线程来都取走一个
                int remaining = available - acquires;
// 这里出 for 有两个条件,remaining < 0 表示 token 数量没有了,
//试想一下,初始化 10 个token ,共有15 个 ,每来一个线程 就 减1,第10 个的时候 remaining  = 0 ,
//这时刚好把token 用完,第11 个来时 就变成了 0-1 = -1 ,remaining = -1 ,并返回。
//在没有取完token 时,remaining < 0 这个条件不会满足。
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))// 第二个条件,CAS ,也就每次取走一个token ,就更新 state 的值,也就说剩下的token 数量。返回的是 10-1 = 9 (剩下的数量)
                    return remaining;
            }
        }
  • compareAndSetState
protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this, 这句就调用的 unsafe 对象 ,本地方法栈里面的。
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
  • unsafe.compareAndSwapInt(this, stateOffset, expect, update);
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

  protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
}

unsafe.compareAndSwapInt(this, stateOffset, expect, update);
this :需要更新字段所在的对象
stateOffset : 字段 --> state
expect : 期望值
update : 更新值
更新之前都会 比较 传入的 expect 值是否和原来的值 相同,相同就 更新。如 当前 state =10 ,expect = 10,update =9 ,则 state = 9。

到这里我们就指定了 在 semaphore 信号量中,上层 semaphore.acquire(); 方法就是将底层 AQS 中的 state 值做修改,而所谓的 获取 执行许可(token )就是将 state 的值 -1 ,直到 取完为止。那么 如果 state = 10 ,但是现在有 15 个线程需要取,剩下5个没有获取成功的 线程又去哪里了呢?下面分析下这个逻辑:

 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)//  tryAcquireShared(arg) 这个方法关键。
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared(arg) : arg = 1 。前面我们说到这个方法 nonfairTryAcquireShared 里面是一个 自旋(for(;;)),里面有个if 条件判断,有两种 条件可以进去,下面我们再来看一看代码:

 final int nonfairTryAcquireShared(int acquires) {
            for (;;) {// 自旋
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || // 条件 1 
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

remaining < 0 。分析一下这个条件 什么时候满足:

试想 : state = 10 ,总共有15 个线程需要获取 许可(token),
当前面 10 都获取 成功后 state = 0,这是第11 个 过来了,
int available = getState(); // available = state = 0
int remaining = available - acquires; // 0-1 , remaining = -1 < 0。
将 return -1。

回到 :

  public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) // 第11 个线程 满足 < 0 的条件。
            doAcquireSharedInterruptibly(arg);
    }
  • doAcquireSharedInterruptibly(arg); arg = 1
   /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // addWaiter() 在队列里面 添加一个节点。新加入的节点再队尾,返回 的是 上一个节点,
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {// 自旋,依次 去队列里面的前一个,
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg); // 再一次尝试 获取 
                    if (r >= 0) { //获取成功,节点被剔除队列。
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

final Node node = addWaiter(Node.SHARED); 在 队列里面添加一个节点,也就说 没有获取到 许可(token) 的线程 被 包装在一个 Node 对象里面,并 添加到了一个队列里面。

  • addWaiter(Node node)
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

到这里我们就知道了,semaphore 中 获取 token 就是每次 将 state 的值 -1 ,当 state 的值 >=0 时 表示 线程有获取资源的权力,当 state 被消耗完了,下一个线程再来的时候,就会被放到一个队列里面,等待下次获取。

  • enq(node);
// 入队操作
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;// 队尾
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t; // 当node 的 前一个 节点 设置为 t 
                if (compareAndSetTail(t, node)) { //设置 当前node 为 队尾
                    t.next = node; // 以前的 队尾 的下一个 节点 设置为 当前节点,当前节点 就成了对尾 
                    return t; //   Node t = tail;// 队尾
                }
            }
        }
    }

相关文章

网友评论

    本文标题:信号量-semaphore -05

    本文链接:https://www.haomeiwen.com/subject/jdmhsctx.html