美文网首页JUC 并发专题一些收藏
Java并发编程——Semaphore

Java并发编程——Semaphore

作者: 小波同学 | 来源:发表于2021-12-12 02:02 被阅读0次

一、Semaphore

Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。

Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。

信号量通过一组许可证来控制对共享资源的访问。

如果需要,可以用acquire()方法获取许可,如果许可为0,那么会进行阻塞,通过使用release()方法释放许可,把许可归还给Semaphore,归还之后,阻塞的线程就会醒来尝试获取许可。

Semaphore提供给了若干个api对应不同的功能:

  • Semaphore(int permits):非公平模式创建;
  • Semaphore(int permits, boolean fair):可以指定是否公平模式创建;
  • acquire():尝试获取1个许可,如果没有许可则阻塞,可以被中断停止等待;
  • acquire(int permits):跟上一个方法类型,尝试获取permits个许可;
  • acquireUninterruptibly():尝试获取一个许可,不可中断;
  • acquireUninterruptibly(int permits):尝试获取permits个许可,不可中断;
  • tryAcquire():尝试获取一个许可,获取不到则直接返回失败;
  • tryAcquire(int permits):尝试获取permits个许可,获取不到则直接返回失败;
  • tryAcquire(int permits, long timeout, TimeUnit unit):尝试在timeout时间内获取permits个许可,超时则返回false,可被中断;
  • tryAcquire(long timeout, TimeUnit unit):尝试在timeout时间内获取1个许可,超时则返回false,可被中断;
  • release():释放一个许可;
  • release(int permits):释放n个许可;

下面演示基于公平锁的Semaphore,获取锁使用acquireUninterruptibly():

这里设置的许可为2,可以发现,同一时刻最多只能有两个线程获得许可。

二、执行原理

Semaphore的执行原理相对来说比较简单。下面描述了可中断非公平的信号量实现原理,ASQ中的state值就相当于许可的数量:

  • 执行acquire的时候,会尝试让state - acquires,如果发现许可足够,则进行cas更新,扣减许可,否则线程进入等待队列;
  • 执行release的时候,state + releases,把许可加回去。

三、Semaphore用法

/**
 * @Description: 演示Semaphore用法
 */
public class SemaphoreDemo {

    public static Semaphore semaphore = new Semaphore(3,true);

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(50);

        for (int i = 0; i < 100; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"拿到了许可证");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println(Thread.currentThread().getName()+"释放了许可证");
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}

注意,如果使用的是tryAcquire失败之后直接返回,线程不会进入AQS等待队列。

四、源码

公平信号量 和 非公平信号量 的区别

"公平信号量"和"非公平信号量"的释放信号量的机制是一样的!

不同的是它们获取信号量的机制:线程在尝试获取信号量许可时,对于公平信号量而言,如果当前线程不在CLH队列的头部,则排队等候;而对于非公平信号量而言,无论当前线程是不是在CLH队列的头部,它都会直接获取信号量。该差异具体的体现在,它们的tryAcquireShared()函数的实现不同。

4.1 Semaphore构造方法

public class Semaphore implements java.io.Serializable {

    private final Sync sync;
    
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
}
  • 1、Semaphore 构造器,permits 为传入的许可证数,默认非公平构造器;

  • 2、Semaphore 构造器,permits 为传入的许可证数,fair 是 boolean 型的,如果传入 true,则公平,否则不公平;

4.2 NonfairSync 和 FairSync源码

public class Semaphore implements java.io.Serializable {

    private final Sync sync;
    
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }   
    
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

两者都继承了 Sync 同步器,初始化时都调用了父类构造器,同时都有一个获取信号的方法,稍后再分析获取信号的区别。

4.3 acquire(获取信号量)

  • 这个方法是从信号量获取一个许可,在获取到许可,或线程中断之前,当前线程阻塞;获取许可后立即返回并将许可数减一
public class Semaphore implements java.io.Serializable {

    private final Sync sync;
    
    /**
    * 如果没有许可可用,则会休眠,直到发生以下两种情况
    * 1、其他调用release方法释放许可,并且当前线程获取到许可
    * 2、其他线程中断了当前线程
    *   1)当前线程在进入这个方法时设置了中断标志位
    *   2)等待许可时发生了中断,则抛出中断异常
    */  
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
}
  • acquireSharedInterruptibly
    这个方法是直接调用AQS的acquireSharedInterruptibly(int ard)方法;
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
   /**
    * 首先检测是否中断.中断后抛出异常
    * 尝试获取许可,成功退出;失败则进入AQS队列,直至成功获取或中断
    */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 尝试获取锁,返回剩余共享锁的数量;小于0则加入同步队列,自旋   
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
}

tryAcquireShared(arg)则会调用Semaphore中两个同步器的tryAcquireShared实现方法; 如果获取失败则加入队列等待唤醒;

4.4 非公平模式的实现

非公平实现都是首先查看是否有可获取的许可,如果有则获取成功,没有则进队列等待;利用此可以提高并发量

public class Semaphore implements java.io.Serializable {

    private final Sync sync;
    
    static final class NonfairSync extends Sync {

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
}
  • 直接调用其父类Sync中非公平共享获取
public class Semaphore implements java.io.Serializable {

    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        
        final int nonfairTryAcquireShared(int acquires) {
            // 自旋直到无许可或者状态位赋值成功
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                // 如果小于0则直接返回,否则利用CAS给AQS状态位赋值
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

通过自旋+CAS来一直尝试获取许可,直到获取成功或者没有许可,返回剩余的许可数

4.5 公平模式的实现

公平与非公平的区别在于始终按照AQS队列FIFO的顺序来的

public class Semaphore implements java.io.Serializable {

    private final Sync sync;
    
    static final class FairSync extends Sync {

        protected int tryAcquireShared(int acquires) {
            //自旋 CAS 实现线程安全
            for (;;) {
                // 判断是否有前置任务排队
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                // 如果小于0则直接返回,否则利用CAS给AQS状态位赋值
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

如果等待队列不为空,则直接返回-1。 以上两种模式获取失败后都会调用doAcquireSharedInterruptibly(int arg);自旋等待获取锁

  • doAcquireSharedInterruptibly方法:会使得当前线程一直等待,直到当前线程获取到锁(或被中断)才返回
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //创建“当前线程”的Node节点,且node中记录的锁是“共享锁”类型,并将节点添加到CLH队列末尾。
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //获取前继节点,如果前继节点是等待锁队列的表头,则尝试获取共享锁
                // 判断新增的节点的前一个节点是否头节点
                final Node p = node.predecessor();
                if (p == head) {
                    // 是头节点,那么在此尝试获取共享锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取成功,把当前节点变为新的head节点,
                        //并且检查后续节点是否可以在共享模式下等待,
                        //并且允许继续传播,则调用doReleaseShared继续唤醒下一个节点尝试获取锁
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //前继节点不是头节点,当前线程一直等待,直到获取到锁
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}
  • shouldParkAfterFailedAcquire方法:判断当前线程获取锁失败之后是否需要挂起
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
 /*说明:4.shouldParkAfterFailedAcquire 返回当前线程是否应该阻塞
    (01) 关于waitStatus请参考下表(中扩号内为waitStatus的值)

    CANCELLED[1]  -- 当前线程已被取消
    SIGNAL[-1]    -- “当前线程的后继线程需要被unpark(唤醒)”。
                        一般发生情况是:当前线程的后继线程处于阻塞状态,
                        而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
    CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒
    PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁”
    [0]           -- 当前线程不属于上面的任何一种状态。
    
    (02) shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。

    规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
    规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
    规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。
    */  
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驱节点的状态
        int ws = pred.waitStatus;
        // 如果前驱节点是SIGNAL状态,则意味着当前线程需要unpark唤醒,此时返回true
        if (ws == Node.SIGNAL)
            return true;
            
        // 如果前继节点是取消的状态即前驱节点状态为CANCELLED 
        if (ws > 0) {
            // 从队尾向前寻找第一个状态不为CANCELLED的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 将前驱节点的状态设置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }   
}   

4.6 void release()

公平和非公平使用相同的释放 释放许可

public class Semaphore implements java.io.Serializable {

    private final Sync sync;
    
    public void release() {
        sync.releaseShared(1);
    }
}
  • 调用AQS中的releaseShared(int arg)
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    //目的是让当前线程释放它所持有的共享锁,它首先会通过tryReleaseShared()去尝试释放共享锁。
    //尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。
    public final boolean releaseShared(int arg) {
        //释放共享锁
        if (tryReleaseShared(arg)) {
             //唤醒所有共享节点线程
            doReleaseShared();
            return true;
        }
        return false;
    }
}
  • tryReleaseShared()在Semaphore.Sync中被重写,释放共享锁,将锁计数器加回去
public class Semaphore implements java.io.Serializable {

    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                // 获取“锁计数器”的状态
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                // 通过CAS函数进行赋值。 
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    }
}
  • 如果释放许可成功,则调用AQS中的doReleaseShared()方法来唤醒AQS队列中等待的线程
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    /**
     * 唤醒同步队列中的一个线程
     */ 
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //是否需要唤醒后继节点
                if (ws == Node.SIGNAL) {
                    //修改状态为初始0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //唤醒h.nex节点线程   
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
}
  • 1)获取队列的头节点元素,如果不为null,并且不为尾节点,说白了,就是不止一个人等待,进入判断。

  • 2)如果线程节点是需要唤醒的线程,则进行唤醒,获取资源使用。

  • 3)失败后重试。

  • 4)如果没有后继需要唤醒的节点,则退出,就相当于每人排队上厕所了,让出来资源就空着。

Semaphore 总结

  • 1、Semaphore 内部维护一组信号量,即一个 volatile 的整型 state 变量。

  • 2、Semaphore 分为公平或非公平两种方式,获取信号量或释放信号量的本质是对 state 进行原子的减少或增加操作。

  • 3、获取不到信号的线程放在等待队列里面,释放信号的时候会唤醒后继节点。

  • 4、Semaphore 主要用于对线程数量、公共资源(比如数据库连接池)等进行数量控制。

参考:
https://www.itzhai.com/articles/graphical-several-fun-concurrent-helper-classes.html

https://www.cnblogs.com/200911/p/6060359.html

https://juejin.cn/post/6844904119547723789

https://blog.csdn.net/yhl_jxy/article/details/87279383

相关文章

网友评论

    本文标题:Java并发编程——Semaphore

    本文链接:https://www.haomeiwen.com/subject/alglfrtx.html